Skip to content

Commit

Permalink
fix: [torrust#591] WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
josecelano committed Jan 10, 2024
1 parent 49c961c commit 2ba9391
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 25 deletions.
5 changes: 3 additions & 2 deletions cSpell.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"Containerfile",
"curr",
"Cyberneering",
"datagram",
"datetime",
"Dijke",
"distroless",
Expand Down Expand Up @@ -79,6 +80,7 @@
"nonroot",
"Norberg",
"numwant",
"nvCFlJCq7fz7Qx6KoKTDiMZvns8l5Kw7",
"oneshot",
"ostr",
"Pando",
Expand Down Expand Up @@ -129,8 +131,7 @@
"Xtorrent",
"Xunlei",
"xxxxxxxxxxxxxxxxxxxxd",
"yyyyyyyyyyyyyyyyyyyyd",
"nvCFlJCq7fz7Qx6KoKTDiMZvns8l5Kw7"
"yyyyyyyyyyyyyyyyyyyyd"
],
"enableFiletypes": [
"dockerfile",
Expand Down
8 changes: 8 additions & 0 deletions src/bootstrap/jobs/udp_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
//! for the configuration options.
use std::sync::Arc;

use log::debug;
use tokio::task::JoinHandle;
use torrust_tracker_configuration::UdpTracker;

Expand Down Expand Up @@ -36,6 +37,13 @@ pub async fn start_job(config: &UdpTracker, tracker: Arc<core::Tracker>) -> Join
.expect("it should be able to start the udp tracker");

tokio::spawn(async move {
debug!(target: "UDP Tracker", "Wait for launcher (UDP service) to finish ...");

assert!(
!server.state.halt_task.is_closed(),
"Halt channel for UDP tracker should be open"
);

server
.state
.task
Expand Down
2 changes: 1 addition & 1 deletion src/servers/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Launcher {
tokio::task::spawn(graceful_shutdown(
handle.clone(),
rx_halt,
format!("Shutting down http server on socket address: {address}"),
format!("Shutting down HTTP server on socket address: {address}"),
));

let tls = self.tls.clone();
Expand Down
70 changes: 48 additions & 22 deletions src/servers/udp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,17 @@ impl UdpServer<Stopped> {
let (tx_start, rx_start) = tokio::sync::oneshot::channel::<Started>();
let (tx_halt, rx_halt) = tokio::sync::oneshot::channel::<Halted>();

assert!(!tx_halt.is_closed(), "Halt channel for UDP tracker should be open");

let launcher = self.state.launcher;

let task = tokio::spawn(async move {
launcher.start(tracker, tx_start, rx_halt).await;
debug!(target: "UDP Tracker", "Launcher start ...");

let server = launcher.start(tracker, tx_start, rx_halt);

server.await;

launcher
});

Expand All @@ -135,8 +142,6 @@ impl UdpServer<Stopped> {
},
};

info!("Running UDP Tracker on Socket: {}", running_udp_server.state.binding);

Ok(running_udp_server)
}
}
Expand Down Expand Up @@ -202,38 +207,59 @@ impl Udp {
tx_start: Sender<Started>,
rx_halt: Receiver<Halted>,
) -> JoinHandle<()> {
let binding = Arc::new(UdpSocket::bind(bind_to).await.expect("Could not bind to {self.socket}."));
let address = binding.local_addr().expect("Could not get local_addr from {binding}.");
let socket = Arc::new(UdpSocket::bind(bind_to).await.expect("Could not bind to {self.socket}."));
let address = socket.local_addr().expect("Could not get local_addr from {binding}.");

let running = tokio::task::spawn(async move {
let halt = async move {
shutdown_signal_with_message(rx_halt, format!("Halting Http Service Bound to Socket: {address}")).await;
};
let halt = tokio::task::spawn(async move {
debug!(target: "UDP Tracker", "Server on socket address: udp://{address} waiting for halt signal ...");

pin_mut!(halt);
shutdown_signal_with_message(
rx_halt,
format!("Shutting down UDP server on socket address: udp://{address}"),
)
.await;
});

info!(target: "UDP Tracker", "Starting on: udp://{}", address);

loop {
let mut data = [0; MAX_PACKET_SIZE];
let binding = binding.clone();
let running = tokio::task::spawn(async move {
let listen = async move {
debug!(target: "UDP Tracker", "Waiting for packets ...");

tokio::select! {
() = & mut halt => {},
loop {
let mut data = [0; MAX_PACKET_SIZE];
let socket_clone = socket.clone();

Ok((valid_bytes, remote_addr)) = binding.recv_from(&mut data) => {
let payload = data[..valid_bytes].to_vec();
match socket_clone.recv_from(&mut data).await {
Ok((valid_bytes, remote_addr)) => {
let payload = data[..valid_bytes].to_vec();

debug!("Received {} bytes", payload.len());
debug!("From: {}", &remote_addr);
debug!("Payload: {:?}", payload);
debug!(target: "UDP Tracker", "Received {} bytes", payload.len());
debug!(target: "UDP Tracker", "From: {}", &remote_addr);
debug!(target: "UDP Tracker", "Payload: {:?}", payload);

let response = handle_packet(remote_addr, payload, &tracker).await;
let response = handle_packet(remote_addr, payload, &tracker).await;

Udp::send_response(binding, remote_addr, response).await;
Udp::send_response(socket_clone, remote_addr, response).await;
}
Err(err) => {
error!("Error reading UDP datagram from socket. Error: {:?}", err);
}
}
}
};

pin_mut!(halt);
pin_mut!(listen);

tokio::select! {
_ = & mut halt => { debug!(target: "UDP Tracker", "Halt signal spawned task stopped on address: udp://{address}"); },
() = & mut listen => { debug!(target: "UDP Tracker", "Socket listener stopped on address: udp://{address}"); },
}
});

info!(target: "UDP Tracker", "Started on: udp://{}", address);

tx_start
.send(Started { address })
.expect("the UDP Tracker service should not be dropped");
Expand Down

0 comments on commit 2ba9391

Please sign in to comment.