diff --git a/cSpell.json b/cSpell.json index d09db93b7..9602ba39b 100644 --- a/cSpell.json +++ b/cSpell.json @@ -32,6 +32,7 @@ "Containerfile", "curr", "Cyberneering", + "datagram", "datetime", "Dijke", "distroless", @@ -79,6 +80,7 @@ "nonroot", "Norberg", "numwant", + "nvCFlJCq7fz7Qx6KoKTDiMZvns8l5Kw7", "oneshot", "ostr", "Pando", @@ -129,8 +131,7 @@ "Xtorrent", "Xunlei", "xxxxxxxxxxxxxxxxxxxxd", - "yyyyyyyyyyyyyyyyyyyyd", - "nvCFlJCq7fz7Qx6KoKTDiMZvns8l5Kw7" + "yyyyyyyyyyyyyyyyyyyyd" ], "enableFiletypes": [ "dockerfile", diff --git a/src/bootstrap/jobs/udp_tracker.rs b/src/bootstrap/jobs/udp_tracker.rs index 5911bdf95..6b76d0900 100644 --- a/src/bootstrap/jobs/udp_tracker.rs +++ b/src/bootstrap/jobs/udp_tracker.rs @@ -8,6 +8,7 @@ //! for the configuration options. use std::sync::Arc; +use log::debug; use tokio::task::JoinHandle; use torrust_tracker_configuration::UdpTracker; @@ -36,6 +37,13 @@ pub async fn start_job(config: &UdpTracker, tracker: Arc) -> Join .expect("it should be able to start the udp tracker"); tokio::spawn(async move { + debug!(target: "UDP Tracker", "Wait for launcher (UDP service) to finish ..."); + + assert!( + !server.state.halt_task.is_closed(), + "Halt channel for UDP tracker should be open" + ); + server .state .task diff --git a/src/servers/http/server.rs b/src/servers/http/server.rs index 904ccdcf5..0a4b687b5 100644 --- a/src/servers/http/server.rs +++ b/src/servers/http/server.rs @@ -48,7 +48,7 @@ impl Launcher { tokio::task::spawn(graceful_shutdown( handle.clone(), rx_halt, - format!("Shutting down http server on socket address: {address}"), + format!("Shutting down HTTP server on socket address: {address}"), )); let tls = self.tls.clone(); diff --git a/src/servers/udp/server.rs b/src/servers/udp/server.rs index a0af55101..ac0b1ee0f 100644 --- a/src/servers/udp/server.rs +++ b/src/servers/udp/server.rs @@ -120,10 +120,17 @@ impl UdpServer { let (tx_start, rx_start) = tokio::sync::oneshot::channel::(); let (tx_halt, rx_halt) = tokio::sync::oneshot::channel::(); + assert!(!tx_halt.is_closed(), "Halt channel for UDP tracker should be open"); + let launcher = self.state.launcher; let task = tokio::spawn(async move { - launcher.start(tracker, tx_start, rx_halt).await; + debug!(target: "UDP Tracker", "Launcher start ..."); + + let server = launcher.start(tracker, tx_start, rx_halt); + + server.await; + launcher }); @@ -135,8 +142,6 @@ impl UdpServer { }, }; - info!("Running UDP Tracker on Socket: {}", running_udp_server.state.binding); - Ok(running_udp_server) } } @@ -202,38 +207,59 @@ impl Udp { tx_start: Sender, rx_halt: Receiver, ) -> JoinHandle<()> { - let binding = Arc::new(UdpSocket::bind(bind_to).await.expect("Could not bind to {self.socket}.")); - let address = binding.local_addr().expect("Could not get local_addr from {binding}."); + let socket = Arc::new(UdpSocket::bind(bind_to).await.expect("Could not bind to {self.socket}.")); + let address = socket.local_addr().expect("Could not get local_addr from {binding}."); - let running = tokio::task::spawn(async move { - let halt = async move { - shutdown_signal_with_message(rx_halt, format!("Halting Http Service Bound to Socket: {address}")).await; - }; + let halt = tokio::task::spawn(async move { + debug!(target: "UDP Tracker", "Server on socket address: udp://{address} waiting for halt signal ..."); - pin_mut!(halt); + shutdown_signal_with_message( + rx_halt, + format!("Shutting down UDP server on socket address: udp://{address}"), + ) + .await; + }); + + info!(target: "UDP Tracker", "Starting on: udp://{}", address); - loop { - let mut data = [0; MAX_PACKET_SIZE]; - let binding = binding.clone(); + let running = tokio::task::spawn(async move { + let listen = async move { + debug!(target: "UDP Tracker", "Waiting for packets ..."); - tokio::select! { - () = & mut halt => {}, + loop { + let mut data = [0; MAX_PACKET_SIZE]; + let socket_clone = socket.clone(); - Ok((valid_bytes, remote_addr)) = binding.recv_from(&mut data) => { - let payload = data[..valid_bytes].to_vec(); + match socket_clone.recv_from(&mut data).await { + Ok((valid_bytes, remote_addr)) => { + let payload = data[..valid_bytes].to_vec(); - debug!("Received {} bytes", payload.len()); - debug!("From: {}", &remote_addr); - debug!("Payload: {:?}", payload); + debug!(target: "UDP Tracker", "Received {} bytes", payload.len()); + debug!(target: "UDP Tracker", "From: {}", &remote_addr); + debug!(target: "UDP Tracker", "Payload: {:?}", payload); - let response = handle_packet(remote_addr, payload, &tracker).await; + let response = handle_packet(remote_addr, payload, &tracker).await; - Udp::send_response(binding, remote_addr, response).await; + Udp::send_response(socket_clone, remote_addr, response).await; + } + Err(err) => { + error!("Error reading UDP datagram from socket. Error: {:?}", err); + } } } + }; + + pin_mut!(halt); + pin_mut!(listen); + + tokio::select! { + _ = & mut halt => { debug!(target: "UDP Tracker", "Halt signal spawned task stopped on address: udp://{address}"); }, + () = & mut listen => { debug!(target: "UDP Tracker", "Socket listener stopped on address: udp://{address}"); }, } }); + info!(target: "UDP Tracker", "Started on: udp://{}", address); + tx_start .send(Started { address }) .expect("the UDP Tracker service should not be dropped");