Skip to content

Commit

Permalink
feat: [torrust#469] import torrent stats using multiple torrents trac…
Browse files Browse the repository at this point in the history
…ker API endpoint

There is a new feature inthe tracker API where you can get stats for a
list of infohashes:

<http://localhost:1212/api/v1/torrents?token=MyAccessToken&info_hash=f584cba7dd4008ecc026ac2dc0ce1ad179822f5f&info_hash=f59caeaf12e7bc8a289c39b698d085bc27eec1c2&info_hash=f6465cb6bd227e7c97d1de7cb426551af97eae41&info_hash=f655996ba112da8d0835463e6be4f47ff0bfef0c&info_hash=f68b7d6296d3e933b455c6107badf8dc6eeccadc&info_hash=f6a4eec77008786d91c344716ed2bb58570cdbd6&info_hash=f6ae9710af2d09faf8d337855e441087e2ff9286&info_hash=f6f89b0a54f3944f36027ff38ec950781e654836&info_hash=f77cfff1ab500a203c73141f98947acd7b5d0686&info_hash=f792faf15179d7c01fbf4647e96c28b155810f90&info_hash=f7c895711191b602211bc267fc0468c302f6974d&info_hash=f7d4589f96974ec030a798f943a82ecfdeb2f013&info_hash=f7ec8c6963cefcaf4c1b322358ac5d9edfb5b8b6&info_hash=f7fc543c48f1535692efa8e623e738bc67997eab&info_hash=f80ba0b3ad573a403e16d7b3d7c17863676f8f1f&info_hash=f871d2a6d41b30c4caa6255c653e1f02cd8996c1&info_hash=f894f06b6d0411f28d5906177103354db3f8340d&info_hash=f89b08ae4a4af5d1327b31bb1a6ed2f9b3d227b4&info_hash=f913667273b8562ec30366f8ba32e7e4a2f65742&info_hash=f9d5713cdf9539f1feffae05c04cfdbbcaea18a8&info_hash=f9eb982706d058dc855cc9a7528048631fff3d33&info_hash=f9fb61ad5aadf585dd86cb63e5bcc6dfed71f6fc&info_hash=faae957e9a3d7f9fd11074b3a49ce6dfd8d1c75b&info_hash=fb08e03e518fb7d5ae6ff73af3854d3e75a6b228&info_hash=fb25d2c0a0a109d90db1459547c926c8fd32f888&info_hash=fb6a3274e36bd2b4f5e3833308e57a0d7eb1cc27&info_hash=fb765107b4029569009003eeb4c87a5707612807&info_hash=fb994291a47627fa3b84849709965aa9bf781f58&info_hash=fba2da365997d3aab086cc2998274051f5a3cc8c&info_hash=fba992473ac2b1760fcda77b9877eeb4e48e4990&info_hash=fbdeb27908830e438eafd1a3f84a114ff0f428bf&info_hash=fbee94aeda72de1035ef8ee2dca861c722d5cf26&info_hash=fc82989b9f718f2fc3cb8487d1fe4ced411f9630&info_hash=fc89b80f119bc6ae91e7263b3f21db55b3fd16ad&info_hash=fcb85658c7ca1a82b5cc563af8165c4d20aa2d9e&info_hash=fcf40cb66b0bb72c9e478f07957a1ee9d140ce75&info_hash=fcf57886742d297d2017b2f83fa69ec8814a0d3d&info_hash=fd0bf9d869d2886a370f81838f978c2c26da4222&info_hash=fd3a4be495bd64a7e2ba4dc8b78eed1f8958f644&info_hash=fe0a1913ad2a1dfa8ddc93e02217c5d8ef384306&info_hash=fe28c9463c50d8febc1b7757553c05b725b42879&info_hash=fe7089ca13b7b218f4af8e98303cf1fbaacc90eb&info_hash=fea586402fccca172470715aa3558978d952799d&info_hash=fee8409338d889ee130dcee19bda84deee72da65&info_hash=ff01f0bf22e5f8483b8e82b2bc88c9c536a76bfa&info_hash=ff52b816d9bad366c2ed1232efd0711b3e262f92&info_hash=ff589afca896eb04bacf245e3c041e6feb54ab05&info_hash=ff6a1c9c60c16ff96115ee95a814017b5c1709a8&info_hash=ffe46c2247e844804adff54d770fe274d2d2e873&info_hash=fff0f6bb2eaae8b2e0e163d1acddd8ff2e4dec7e>

This way you can get many torrents stasts in one request.

This commit replaces the statistics importer to use this new endpoint
feature.
  • Loading branch information
josecelano committed Mar 13, 2024
1 parent 16cbea8 commit af7150b
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 17 deletions.
9 changes: 9 additions & 0 deletions src/console/cronjobs/tracker_statistics_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ pub fn start(

info!("Tracker statistics importer cronjob starting ...");

// code-review: we set an execution interval to avoid intense polling to
// the database. If we remove the interval we would be constantly
// queering if there are torrent stats pending to update, unless there
// are torrents to update. Maybe we should only sleep for 100 milliseconds
// if we did not update any torrents in the latest execution.
// With this current limit we can only import 50 torrent stats every 100
// milliseconds which is 500 torrents per second (1800000 torrents per hour).
// If the tracker can handle a request in 100 milliseconds.

let execution_interval_in_milliseconds = 100;
let execution_interval_duration = std::time::Duration::from_millis(execution_interval_in_milliseconds);
let mut execution_interval = tokio::time::interval(execution_interval_duration);
Expand Down
25 changes: 23 additions & 2 deletions src/tracker/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ impl ConnectionInfo {
}
}

const TOKEN_PARAM_NAME: &str = "token";

pub struct Client {
pub connection_info: ConnectionInfo,
api_base_url: String,
Expand All @@ -29,7 +31,7 @@ impl Client {
pub fn new(connection_info: ConnectionInfo) -> Result<Self, Error> {
let base_url = format!("{}/api/v1", connection_info.url);
let client = reqwest::Client::builder().timeout(Duration::from_secs(5)).build()?;
let token_param = [("token".to_string(), connection_info.token.to_string())];
let token_param = [(TOKEN_PARAM_NAME.to_string(), connection_info.token.to_string())];

Ok(Self {
connection_info,
Expand Down Expand Up @@ -72,7 +74,7 @@ impl Client {
self.client.post(request_url).query(&self.token_param).send().await
}

/// Retrieve the info for a torrent.
/// Retrieve the info for one torrent.
///
/// # Errors
///
Expand All @@ -82,4 +84,23 @@ impl Client {

self.client.get(request_url).query(&self.token_param).send().await
}

/// Retrieve the info for multiple torrents at the same time.
///
/// # Errors
///
/// Will return an error if the HTTP request fails.
pub async fn get_torrents_info(&self, info_hashes: &[String]) -> Result<Response, Error> {
let request_url = format!("{}/torrents", self.api_base_url);

let mut query_params: Vec<(String, String)> = Vec::with_capacity(info_hashes.len() + 1);

query_params.push((TOKEN_PARAM_NAME.to_string(), self.connection_info.token.clone()));

for info_hash in info_hashes {
query_params.push(("info_hash".to_string(), info_hash.clone()));
}

self.client.get(request_url).query(&query_params).send().await
}
}
56 changes: 56 additions & 0 deletions src/tracker/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ pub struct TorrentInfo {
pub peers: Vec<Peer>,
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct TorrentBasicInfo {
pub info_hash: String,
pub seeders: i64,
pub completed: i64,
pub leechers: i64,
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct Peer {
pub peer_id: Option<PeerId>,
Expand Down Expand Up @@ -259,6 +267,54 @@ impl Service {
}
}

/// Get torrent info from tracker in batches.
///
/// # Errors
///
/// Will return an error if the HTTP request to get torrent info fails or
/// if the response cannot be parsed.
pub async fn get_torrents_info(&self, info_hashes: &[String]) -> Result<Vec<TorrentBasicInfo>, TrackerAPIError> {
debug!(target: "tracker-service", "get torrents info");

let maybe_response = self.api_client.get_torrents_info(info_hashes).await;

debug!(target: "tracker-service", "get torrents info response result: {:?}", maybe_response);

match maybe_response {
Ok(response) => {
let status: StatusCode = map_status_code(response.status());

let body = response.text().await.map_err(|_| {
error!(target: "tracker-service", "response without body");
TrackerAPIError::MissingResponseBody
})?;

match status {
StatusCode::OK => serde_json::from_str(&body).map_err(|e| {
error!(
target: "tracker-service", "Failed to parse torrents info from tracker response. Body: {}, Error: {}",
body, e
);
TrackerAPIError::FailedToParseTrackerResponse { body }
}),
StatusCode::INTERNAL_SERVER_ERROR => {
if body == Self::invalid_token_body() {
Err(TrackerAPIError::InvalidToken)
} else {
error!(target: "tracker-service", "get torrents info 500 response: status {status}, body: {body}");
Err(TrackerAPIError::InternalServerError)
}
}
_ => {
error!(target: "tracker-service", "get torrents info unhandled response: status {status}, body: {body}");
Err(TrackerAPIError::UnexpectedResponseStatus)
}
}
}
Err(_) => Err(TrackerAPIError::TrackerOffline),
}
}

/// Issue a new tracker key from tracker.
async fn retrieve_new_tracker_key(&self, user_id: i64) -> Result<TrackerKey, TrackerAPIError> {
debug!(target: "tracker-service", "retrieve key: {user_id}");
Expand Down
50 changes: 35 additions & 15 deletions src/tracker/statistics_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl StatisticsImporter {
torrent.torrent_id, torrent.info_hash, err
);
error!(target: "statistics_importer", "{}", message);
// todo: return a service error that can be a tracker API error or a database error.
}
}
}
Expand Down Expand Up @@ -92,29 +93,48 @@ impl StatisticsImporter {

info!(target: LOG_TARGET, "Importing {} torrents statistics from tracker {} ...", torrents.len().to_string().yellow(), self.tracker_url.yellow());

// Start the timer before the loop
let start_time = Instant::now();
// Import stats for all torrents in one request

for torrent in torrents {
info!(target: LOG_TARGET, "Importing torrent #{} statistics ...", torrent.torrent_id.to_string().yellow());
let info_hashes: Vec<String> = torrents.iter().map(|t| t.info_hash.clone()).collect();

let ret = self.import_torrent_statistics(torrent.torrent_id, &torrent.info_hash).await;
let torrent_info_vec = match self.tracker_service.get_torrents_info(&info_hashes).await {
Ok(torrents_info) => torrents_info,
Err(err) => {
let message = format!("Error getting torrents tracker stats. Error: {err:?}");
error!(target: LOG_TARGET, "{}", message);
// todo: return a service error that can be a tracker API error or a database error.
return Ok(());
}
};

if let Some(err) = ret.err() {
if err != TrackerAPIError::TorrentNotFound {
let message = format!(
"Error updating torrent tracker stats for torrent. Torrent: id {}; infohash {}. Error: {:?}",
torrent.torrent_id, torrent.info_hash, err
// Update stats for all torrents

for torrent in torrents {
match torrent_info_vec.iter().find(|t| t.info_hash == torrent.info_hash) {
None => {
// No stats for this torrent in the tracker
drop(
self.database
.update_tracker_info(torrent.torrent_id, &self.tracker_url, 0, 0)
.await,
);
}
Some(torrent_info) => {
// Update torrent stats for this tracker
drop(
self.database
.update_tracker_info(
torrent.torrent_id,
&self.tracker_url,
torrent_info.seeders,
torrent_info.leechers,
)
.await,
);
error!(target: LOG_TARGET, "{}", message);
}
}
}

let elapsed_time = start_time.elapsed();

info!(target: LOG_TARGET, "Statistics import completed in {:.2?}", elapsed_time);

Ok(())
}

Expand Down

0 comments on commit af7150b

Please sign in to comment.