Skip to content

Commit

Permalink
feat: [torrust#469] import torrent statistics in batches
Browse files Browse the repository at this point in the history
Instead of importing all torrents statistics from the tracker every
hour, this change imports 50 torrents every 100 milliseconds.

Batches only include torrents that have not been updated in the last
hour (or whatever is the `torrent_info_update_interval` value in the
configuration).

This change avoid loading the whole set of torrents in memory every time
the importation starts.

In the future, It could also allow to handle the nunber of request per
second to the tracker (statically, from config value or dinamically,
depending on the tracler load).

Althought it saves memory, it runs more SQL queries to get the list of
torrents pending to update.
  • Loading branch information
josecelano committed Mar 12, 2024
1 parent ff9f5ff commit a1e8ae9
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 21 deletions.
48 changes: 36 additions & 12 deletions src/console/cronjobs/tracker_statistics_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ use axum::extract::State;
use axum::routing::{get, post};
use axum::{Json, Router};
use chrono::{DateTime, Utc};
use log::{error, info};
use log::{debug, error, info};
use serde_json::{json, Value};
use text_colorizer::Colorize;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;

use crate::tracker::statistics_importer::StatisticsImporter;
use crate::utils::clock::seconds_ago_utc;

const IMPORTER_API_IP: &str = "127.0.0.1";

Expand All @@ -41,7 +43,7 @@ struct ImporterState {
#[must_use]
pub fn start(
importer_port: u16,
torrent_info_update_interval: u64,
torrent_stats_update_interval: u64,
tracker_statistics_importer: &Arc<StatisticsImporter>,
) -> JoinHandle<()> {
let weak_tracker_statistics_importer = Arc::downgrade(tracker_statistics_importer);
Expand All @@ -54,7 +56,7 @@ pub fn start(
let _importer_api_handle = tokio::spawn(async move {
let import_state = Arc::new(ImporterState {
last_heartbeat: Arc::new(Mutex::new(Utc::now())),
torrent_info_update_interval,
torrent_info_update_interval: torrent_stats_update_interval,
});

let app = Router::new()
Expand All @@ -81,25 +83,47 @@ pub fn start(

info!("Tracker statistics importer cronjob starting ...");

let interval = std::time::Duration::from_secs(torrent_info_update_interval);
let mut interval = tokio::time::interval(interval);
let execution_interval_in_milliseconds = 100;
let execution_interval_duration = std::time::Duration::from_millis(execution_interval_in_milliseconds);
let mut execution_interval = tokio::time::interval(execution_interval_duration);

interval.tick().await; // first tick is immediate...
execution_interval.tick().await; // first tick is immediate...

loop {
interval.tick().await;

info!("Running tracker statistics importer ...");
info!("Running tracker statistics importer every {execution_interval_in_milliseconds} milliseconds ...");

loop {
if let Err(e) = send_heartbeat(importer_port).await {
error!("Failed to send heartbeat from importer cronjob: {}", e);
}

if let Some(tracker) = weak_tracker_statistics_importer.upgrade() {
drop(tracker.import_all_torrents_statistics().await);
if let Some(statistics_importer) = weak_tracker_statistics_importer.upgrade() {
let one_interval_ago = seconds_ago_utc(
torrent_stats_update_interval
.try_into()
.expect("update interval should be a positive integer"),
);
let limit = 50;

debug!(
"Importing torrents statistics not updated since {} limited to a maximum of {} torrents ...",
one_interval_ago.to_string().yellow(),
limit.to_string().yellow()
);

match statistics_importer
.import_torrents_statistics_not_updated_since(one_interval_ago, limit)
.await
{
Ok(()) => {}
Err(e) => error!("Failed to import statistics: {:?}", e),
}

drop(statistics_importer);
} else {
break;
}

execution_interval.tick().await;
}
})
}
Expand Down
9 changes: 8 additions & 1 deletion src/databases/database.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use async_trait::async_trait;
use chrono::NaiveDateTime;
use chrono::{DateTime, NaiveDateTime, Utc};
use serde::{Deserialize, Serialize};

use crate::databases::mysql::Mysql;
Expand Down Expand Up @@ -292,6 +292,13 @@ pub trait Database: Sync + Send {
/// Get all torrents as `Vec<TorrentCompact>`.
async fn get_all_torrents_compact(&self) -> Result<Vec<TorrentCompact>, Error>;

/// Get torrents whose stats have not been imported from the tracker at least since a given datetime.
async fn get_torrents_with_stats_not_updated_since(
&self,
datetime: DateTime<Utc>,
limit: i64,
) -> Result<Vec<TorrentCompact>, Error>;

/// Update a torrent's title with `torrent_id` and `title`.
async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), Error>;

Expand Down
25 changes: 23 additions & 2 deletions src/databases/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::str::FromStr;
use std::time::Duration;

use async_trait::async_trait;
use chrono::NaiveDateTime;
use chrono::{DateTime, NaiveDateTime, Utc};
use sqlx::mysql::{MySqlConnectOptions, MySqlPoolOptions};
use sqlx::{query, query_as, Acquire, ConnectOptions, MySqlPool};

Expand All @@ -20,7 +20,7 @@ use crate::models::torrent_tag::{TagId, TorrentTag};
use crate::models::tracker_key::TrackerKey;
use crate::models::user::{User, UserAuthentication, UserCompact, UserId, UserProfile};
use crate::services::torrent::{CanonicalInfoHashGroup, DbTorrentInfoHash};
use crate::utils::clock::{self, datetime_now};
use crate::utils::clock::{self, datetime_now, DATETIME_FORMAT};
use crate::utils::hex::from_bytes;

pub struct Mysql {
Expand Down Expand Up @@ -884,6 +884,27 @@ impl Database for Mysql {
.map_err(|_| database::Error::Error)
}

async fn get_torrents_with_stats_not_updated_since(
&self,
datetime: DateTime<Utc>,
limit: i64,
) -> Result<Vec<TorrentCompact>, database::Error> {
query_as::<_, TorrentCompact>(
"SELECT tt.torrent_id, tt.info_hash
FROM torrust_torrents tt
LEFT JOIN torrust_torrent_tracker_stats tts ON tt.torrent_id = tts.torrent_id
WHERE tts.updated_at < ? OR tts.updated_at IS NULL
ORDER BY tts.updated_at ASC
LIMIT ?
",
)
.bind(datetime.format(DATETIME_FORMAT).to_string())
.bind(limit)
.fetch_all(&self.pool)
.await
.map_err(|_| database::Error::Error)
}

async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), database::Error> {
query("UPDATE torrust_torrent_info SET title = ? WHERE torrent_id = ?")
.bind(title)
Expand Down
25 changes: 23 additions & 2 deletions src/databases/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::str::FromStr;
use std::time::Duration;

use async_trait::async_trait;
use chrono::NaiveDateTime;
use chrono::{DateTime, NaiveDateTime, Utc};
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
use sqlx::{query, query_as, Acquire, ConnectOptions, SqlitePool};

Expand All @@ -20,7 +20,7 @@ use crate::models::torrent_tag::{TagId, TorrentTag};
use crate::models::tracker_key::TrackerKey;
use crate::models::user::{User, UserAuthentication, UserCompact, UserId, UserProfile};
use crate::services::torrent::{CanonicalInfoHashGroup, DbTorrentInfoHash};
use crate::utils::clock::{self, datetime_now};
use crate::utils::clock::{self, datetime_now, DATETIME_FORMAT};
use crate::utils::hex::from_bytes;

pub struct Sqlite {
Expand Down Expand Up @@ -876,6 +876,27 @@ impl Database for Sqlite {
.map_err(|_| database::Error::Error)
}

async fn get_torrents_with_stats_not_updated_since(
&self,
datetime: DateTime<Utc>,
limit: i64,
) -> Result<Vec<TorrentCompact>, database::Error> {
query_as::<_, TorrentCompact>(
"SELECT tt.torrent_id, tt.info_hash
FROM torrust_torrents tt
LEFT JOIN torrust_torrent_tracker_stats tts ON tt.torrent_id = tts.torrent_id
WHERE tts.updated_at < ? OR tts.updated_at IS NULL
ORDER BY tts.updated_at ASC
LIMIT ?
",
)
.bind(datetime.format(DATETIME_FORMAT).to_string())
.bind(limit)
.fetch_all(&self.pool)
.await
.map_err(|_| database::Error::Error)
}

async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), database::Error> {
query("UPDATE torrust_torrent_info SET title = $1 WHERE torrent_id = $2")
.bind(title)
Expand Down
58 changes: 56 additions & 2 deletions src/tracker/statistics_importer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::sync::Arc;
use std::time::Instant;

use log::{error, info};
use chrono::{DateTime, Utc};
use log::{debug, error, info};
use text_colorizer::Colorize;

use super::service::{Service, TorrentInfo, TrackerAPIError};
Expand Down Expand Up @@ -36,13 +37,17 @@ impl StatisticsImporter {
pub async fn import_all_torrents_statistics(&self) -> Result<(), database::Error> {
let torrents = self.database.get_all_torrents_compact().await?;

if torrents.is_empty() {
return Ok(());
}

info!(target: LOG_TARGET, "Importing {} torrents statistics from tracker {} ...", torrents.len().to_string().yellow(), self.tracker_url.yellow());

// Start the timer before the loop
let start_time = Instant::now();

for torrent in torrents {
info!(target: LOG_TARGET, "Importing torrent #{} ...", torrent.torrent_id.to_string().yellow());
info!(target: LOG_TARGET, "Importing torrent #{} statistics ...", torrent.torrent_id.to_string().yellow());

let ret = self.import_torrent_statistics(torrent.torrent_id, &torrent.info_hash).await;

Expand All @@ -64,6 +69,55 @@ impl StatisticsImporter {
Ok(())
}

/// Import torrents statistics not updated recently..
///
/// # Errors
///
/// Will return an error if the database query failed.
pub async fn import_torrents_statistics_not_updated_since(
&self,
datetime: DateTime<Utc>,
limit: i64,
) -> Result<(), database::Error> {
debug!(target: LOG_TARGET, "Importing torrents statistics not updated since {} limited to a maximum of {} torrents ...", datetime.to_string().yellow(), limit.to_string().yellow());

let torrents = self
.database
.get_torrents_with_stats_not_updated_since(datetime, limit)
.await?;

if torrents.is_empty() {
return Ok(());
}

info!(target: LOG_TARGET, "Importing {} torrents statistics from tracker {} ...", torrents.len().to_string().yellow(), self.tracker_url.yellow());

// Start the timer before the loop
let start_time = Instant::now();

for torrent in torrents {
info!(target: LOG_TARGET, "Importing torrent #{} statistics ...", torrent.torrent_id.to_string().yellow());

let ret = self.import_torrent_statistics(torrent.torrent_id, &torrent.info_hash).await;

if let Some(err) = ret.err() {
if err != TrackerAPIError::TorrentNotFound {
let message = format!(
"Error updating torrent tracker stats for torrent. Torrent: id {}; infohash {}. Error: {:?}",
torrent.torrent_id, torrent.info_hash, err
);
error!(target: LOG_TARGET, "{}", message);
}
}
}

let elapsed_time = start_time.elapsed();

info!(target: LOG_TARGET, "Statistics import completed in {:.2?}", elapsed_time);

Ok(())
}

/// Import torrent statistics from tracker and update them in database.
///
/// # Errors
Expand Down
12 changes: 10 additions & 2 deletions src/utils/clock.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use chrono::Utc;
use chrono::{DateTime, Duration, Utc};

pub const DATETIME_FORMAT: &str = "%Y-%m-%d %H:%M:%S";

/// Returns the current timestamp in seconds.
///
Expand All @@ -11,10 +13,16 @@ pub fn now() -> u64 {
u64::try_from(chrono::prelude::Utc::now().timestamp()).expect("timestamp should be positive")
}

/// Returns the datetime some seconds ago.
#[must_use]
pub fn seconds_ago_utc(seconds: i64) -> DateTime<chrono::Utc> {
Utc::now() - Duration::seconds(seconds)
}

/// Returns the current time in database format.
///
/// For example: `2024-03-12 15:56:24`.
#[must_use]
pub fn datetime_now() -> String {
Utc::now().format("%Y-%m-%d %H:%M:%S").to_string()
Utc::now().format(DATETIME_FORMAT).to_string()
}

0 comments on commit a1e8ae9

Please sign in to comment.