Skip to content

Commit

Permalink
refactor: [#746] rename functions and extract named closures
Browse files Browse the repository at this point in the history
  • Loading branch information
josecelano committed Mar 20, 2024
1 parent cc1cbc1 commit 9015668
Showing 1 changed file with 36 additions and 31 deletions.
67 changes: 36 additions & 31 deletions src/servers/udp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,24 +255,7 @@ impl Udp {

let running = tokio::task::spawn(async move {
debug!(target: "UDP TRACKER", "Started: Waiting for packets on socket address: udp://{address} ...");

let tracker = tracker.clone();
let socket = socket.clone();

let reqs = &mut ActiveRequests::default();

// Main Waiting Loop, awaits on async [`receive_request`].
loop {
if let Some(h) = reqs.rb.push_overwrite(
Self::do_request(Self::receive_request(socket.clone()).await, tracker.clone(), socket.clone()).abort_handle(),
) {
if !h.is_finished() {
// the task is still running, lets yield and give it a chance to flush.
tokio::task::yield_now().await;
h.abort();
}
}
}
Self::run_udp_server(tracker, socket).await;
});

tx_start
Expand All @@ -292,6 +275,27 @@ impl Udp {
task::yield_now().await; // lets allow the other threads to complete.
}

async fn run_udp_server(tracker: Arc<Tracker>, socket: Arc<UdpSocket>) {
let tracker = tracker.clone();
let socket = socket.clone();

let reqs = &mut ActiveRequests::default();

// Main Waiting Loop, awaits on async [`receive_request`].
loop {
if let Some(h) = reqs.rb.push_overwrite(
Self::spawn_request_processor(Self::receive_request(socket.clone()).await, tracker.clone(), socket.clone())
.abort_handle(),
) {
if !h.is_finished() {

Check warning on line 290 in src/servers/udp/server.rs

View check run for this annotation

Codecov / codecov/patch

src/servers/udp/server.rs#L290

Added line #L290 was not covered by tests
// the task is still running, lets yield and give it a chance to flush.
tokio::task::yield_now().await;
h.abort();

Check warning on line 293 in src/servers/udp/server.rs

View check run for this annotation

Codecov / codecov/patch

src/servers/udp/server.rs#L292-L293

Added lines #L292 - L293 were not covered by tests
}
}

Check warning on line 295 in src/servers/udp/server.rs

View check run for this annotation

Codecov / codecov/patch

src/servers/udp/server.rs#L295

Added line #L295 was not covered by tests
}
}

async fn receive_request(socket: Arc<UdpSocket>) -> Result<UdpRequest, Box<std::io::Error>> {
// Wait for the socket to be readable
socket.readable().await?;
Expand All @@ -309,26 +313,27 @@ impl Udp {
}
}

fn do_request(
fn spawn_request_processor(
result: Result<UdpRequest, Box<std::io::Error>>,
tracker: Arc<Tracker>,
socket: Arc<UdpSocket>,
) -> JoinHandle<()> {
// timeout not needed, as udp is non-blocking.
tokio::task::spawn(async move {
match result {
Ok(udp_request) => {
trace!("Received Request from: {}", udp_request.from);
Self::make_response(tracker.clone(), socket.clone(), udp_request).await;
}
Err(error) => {
debug!("error: {error}");
}
tokio::task::spawn(Self::process_request(result, tracker, socket))
}

async fn process_request(result: Result<UdpRequest, Box<std::io::Error>>, tracker: Arc<Tracker>, socket: Arc<UdpSocket>) {
match result {
Ok(udp_request) => {
trace!("Received Request from: {}", udp_request.from);
Self::process_valid_request(tracker.clone(), socket.clone(), udp_request).await;
}
})
Err(error) => {
debug!("error: {error}");
}

Check warning on line 332 in src/servers/udp/server.rs

View check run for this annotation

Codecov / codecov/patch

src/servers/udp/server.rs#L330-L332

Added lines #L330 - L332 were not covered by tests
}
}

async fn make_response(tracker: Arc<Tracker>, socket: Arc<UdpSocket>, udp_request: UdpRequest) {
async fn process_valid_request(tracker: Arc<Tracker>, socket: Arc<UdpSocket>, udp_request: UdpRequest) {
trace!("Making Response to {udp_request:?}");
let from = udp_request.from;
let response = handlers::handle_packet(udp_request, &tracker.clone(), socket.clone()).await;
Expand Down

0 comments on commit 9015668

Please sign in to comment.