Skip to content

Commit

Permalink
feat: [#625] a new UDP tracker client
Browse files Browse the repository at this point in the history
You can use it with:

```console
cargo run --bin udp_tracker_client 144.126.245.19:6969 9c38422213e30bff212b30c360d26f9a02136422
```

and the output should be something like:

```
AnnounceIpv4(
    AnnounceResponse {
        transaction_id: TransactionId(
            -888840697,
        ),
        announce_interval: AnnounceInterval(
            300,
        ),
        leechers: NumberOfPeers(
            0,
        ),
        seeders: NumberOfPeers(
            4,
        ),
        peers: [
            ResponsePeer {
                ip_address: xx.yy.zz.254,
                port: Port(
                    51516,
                ),
            },
            ResponsePeer {
                ip_address: xx.yy.zz.20,
                port: Port(
                    59448,
                ),
            },
            ResponsePeer {
                ip_address: xx.yy.zz.224,
                port: Port(
                    58587,
                ),
            },
        ],
    },
)
```
  • Loading branch information
josecelano committed Jan 19, 2024
1 parent 62fddba commit f0710d3
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 3 deletions.
154 changes: 154 additions & 0 deletions src/bin/udp_tracker_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use std::env;
use std::net::{Ipv4Addr, SocketAddr};
use std::str::FromStr;

use aquatic_udp_protocol::common::InfoHash;
use aquatic_udp_protocol::{
AnnounceEvent, AnnounceRequest, ConnectRequest, ConnectionId, NumberOfBytes, NumberOfPeers, PeerId, PeerKey, Port, Response,
TransactionId,
};
use log::{debug, LevelFilter};
use torrust_tracker::shared::bit_torrent::info_hash::InfoHash as TorrustInfoHash;
use torrust_tracker::shared::bit_torrent::tracker::udp::client::{UdpClient, UdpTrackerClient};

const ASSIGNED_BY_OS: i32 = 0;
const RANDOM_TRANSACTION_ID: i32 = -888_840_697;

#[tokio::main]
async fn main() {
setup_logging(LevelFilter::Info);

let (remote_socket_addr, info_hash) = parse_arguments();

// Configuration
let local_port = ASSIGNED_BY_OS;
let transaction_id = RANDOM_TRANSACTION_ID;
let bind_to = format!("0.0.0.0:{local_port}");

// Bind to local port

debug!("Binding to: {bind_to}");
let udp_client = UdpClient::bind(&bind_to).await;
let bound_to = udp_client.socket.local_addr().unwrap();
debug!("Bound to: {bound_to}");

// Connect to remote socket

debug!("Connecting to remote: udp://{remote_socket_addr}");
udp_client.connect(&remote_socket_addr).await;

let udp_tracker_client = UdpTrackerClient { udp_client };

let transaction_id = TransactionId(transaction_id);

let connection_id = send_connection_request(transaction_id, &udp_tracker_client).await;

let response = send_announce_request(
connection_id,
transaction_id,
info_hash,
Port(bound_to.port()),
&udp_tracker_client,
)
.await;

println!("{response:#?}");
}

fn setup_logging(level: LevelFilter) {
if let Err(_err) = fern::Dispatch::new()
.format(|out, message, record| {
out.finish(format_args!(
"{} [{}][{}] {}",
chrono::Local::now().format("%+"),
record.target(),
record.level(),
message
));
})
.level(level)
.chain(std::io::stdout())
.apply()
{
panic!("Failed to initialize logging.")
}

debug!("logging initialized.");
}

fn parse_arguments() -> (String, TorrustInfoHash) {
let args: Vec<String> = env::args().collect();

if args.len() != 3 {
eprintln!("Error: invalid number of arguments!");
eprintln!("Usage: cargo run --bin udp_tracker_client <UDP_TRACKER_SOCKET_ADDRESS> <INFO_HASH>");
eprintln!("Example: cargo run --bin udp_tracker_client 144.126.245.19:6969 9c38422213e30bff212b30c360d26f9a02136422");
std::process::exit(1);
}

let remote_socket_addr = &args[1];
let _valid_socket_addr = remote_socket_addr.parse::<SocketAddr>().unwrap_or_else(|_| {
panic!(
"Invalid argument: `{}`. Argument 1 should be a valid socket address. For example: `144.126.245.19:6969`.",
args[1]
)
});
let info_hash = TorrustInfoHash::from_str(&args[2]).unwrap_or_else(|_| {
panic!(
"Invalid argument: `{}`. Argument 2 should be a valid infohash. For example: `9c38422213e30bff212b30c360d26f9a02136422`.",
args[2]
)
});

(remote_socket_addr.to_string(), info_hash)
}

async fn send_connection_request(transaction_id: TransactionId, client: &UdpTrackerClient) -> ConnectionId {
debug!("Sending connection request with transaction id: {transaction_id:#?}");

let connect_request = ConnectRequest { transaction_id };

client.send(connect_request.into()).await;

let response = client.receive().await;

debug!("connection request response:\n{response:#?}");

match response {
Response::Connect(connect_response) => connect_response.connection_id,
_ => panic!("error connecting to udp server. Unexpected response"),
}
}

async fn send_announce_request(
connection_id: ConnectionId,
transaction_id: TransactionId,
info_hash: TorrustInfoHash,
port: Port,
client: &UdpTrackerClient,
) -> Response {
debug!("Sending announce request with transaction id: {transaction_id:#?}");

let announce_request = AnnounceRequest {
connection_id,
transaction_id,
info_hash: InfoHash(info_hash.bytes()),
peer_id: PeerId(*b"-qB00000000000000001"),
bytes_downloaded: NumberOfBytes(0i64),
bytes_uploaded: NumberOfBytes(0i64),
bytes_left: NumberOfBytes(0i64),
event: AnnounceEvent::Started,
ip_address: Some(Ipv4Addr::new(0, 0, 0, 0)),
key: PeerKey(0u32),
peers_wanted: NumberOfPeers(1i32),
port,
};

client.send(announce_request.into()).await;

let response = client.receive().await;

debug!("announce request response:\n{response:#?}");

response
}
32 changes: 29 additions & 3 deletions src/shared/bit_torrent/tracker/udp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;
use std::time::Duration;

use aquatic_udp_protocol::{ConnectRequest, Request, Response, TransactionId};
use log::debug;
use tokio::net::UdpSocket;
use tokio::time;

Expand All @@ -19,7 +20,12 @@ impl UdpClient {
///
/// Will panic if the local address can't be bound.
pub async fn bind(local_address: &str) -> Self {
let socket = UdpSocket::bind(local_address).await.unwrap();
let valid_socket_addr = local_address
.parse::<SocketAddr>()
.unwrap_or_else(|_| panic!("{local_address} is not a valid socket address"));

let socket = UdpSocket::bind(valid_socket_addr).await.unwrap();

Self {
socket: Arc::new(socket),
}
Expand All @@ -29,7 +35,14 @@ impl UdpClient {
///
/// Will panic if can't connect to the socket.
pub async fn connect(&self, remote_address: &str) {
self.socket.connect(remote_address).await.unwrap();
let valid_socket_addr = remote_address
.parse::<SocketAddr>()
.unwrap_or_else(|_| panic!("{remote_address} is not a valid socket address"));

match self.socket.connect(valid_socket_addr).await {
Ok(()) => debug!("Connected successfully"),
Err(e) => panic!("Failed to connect: {e:?}"),

Check warning on line 44 in src/shared/bit_torrent/tracker/udp/client.rs

View check run for this annotation

Codecov / codecov/patch

src/shared/bit_torrent/tracker/udp/client.rs#L44

Added line #L44 was not covered by tests
}
}

/// # Panics
Expand All @@ -39,6 +52,8 @@ impl UdpClient {
/// - Can't write to the socket.
/// - Can't send data.
pub async fn send(&self, bytes: &[u8]) -> usize {
debug!(target: "UDP client", "send {bytes:?}");

self.socket.writable().await.unwrap();
self.socket.send(bytes).await.unwrap()
}
Expand All @@ -50,8 +65,15 @@ impl UdpClient {
/// - Can't read from the socket.
/// - Can't receive data.
pub async fn receive(&self, bytes: &mut [u8]) -> usize {
debug!(target: "UDP client", "receiving ...");

self.socket.readable().await.unwrap();
self.socket.recv(bytes).await.unwrap()

let size = self.socket.recv(bytes).await.unwrap();

debug!(target: "UDP client", "{size} bytes received {bytes:?}");

size
}
}

Expand All @@ -73,6 +95,8 @@ impl UdpTrackerClient {
///
/// Will panic if can't write request to bytes.
pub async fn send(&self, request: Request) -> usize {
debug!(target: "UDP tracker client", "send request {request:?}");

// Write request into a buffer
let request_buffer = vec![0u8; MAX_PACKET_SIZE];
let mut cursor = Cursor::new(request_buffer);
Expand All @@ -99,6 +123,8 @@ impl UdpTrackerClient {

let payload_size = self.udp_client.receive(&mut response_buffer).await;

debug!(target: "UDP tracker client", "received {payload_size} bytes. Response {response_buffer:?}");

Response::from_bytes(&response_buffer[..payload_size], true).unwrap()
}
}
Expand Down
2 changes: 2 additions & 0 deletions tests/servers/udp/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ mod receiving_an_announce_request {

let response = client.receive().await;

println!("test response {response:?}");

assert!(is_ipv4_announce_response(&response));
}
}
Expand Down

0 comments on commit f0710d3

Please sign in to comment.