Skip to content

Commit

Permalink
Merge pull request #1046 from stlankes/poll
Browse files Browse the repository at this point in the history
add support of a unix-like poll function
  • Loading branch information
mkroening committed Feb 5, 2024
2 parents 774c757 + 5dbbb03 commit b271085
Show file tree
Hide file tree
Showing 16 changed files with 1,285 additions and 667 deletions.
69 changes: 59 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ num-derive = "0.4"
num-traits = { version = "0.2", default-features = false }
pci-ids = { version = "0.2", optional = true }
pci_types = { version = "0.6" }
pflock = "0.2"
rand_chacha = { version = "0.3", default-features = false }
shell-words = { version = "1.1", default-features = false }
smallvec = { version = "1", features = ["const_new"] }
Expand All @@ -94,6 +93,8 @@ talc = { version = "4" }
time = { version = "0.3", default-features = false }
zerocopy = { version = "0.7", features = ["derive"] }
build-time = "0.1.3"
async-trait = "0.1.48"
async-lock = { version = "3.3.0", default-features = false }

[dependencies.smoltcp]
version = "0.11"
Expand Down
18 changes: 10 additions & 8 deletions src/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hermit_sync::InterruptTicketMutex;

use crate::arch;

pub struct Console(());
pub(crate) struct Console(());

/// A collection of methods that are required to format
/// a message to Hermit's console.
Expand All @@ -21,14 +21,16 @@ impl fmt::Write for Console {
}
}

impl Console {
#[inline]
pub fn write_all(&mut self, buf: &[u8]) {
arch::output_message_buf(buf)
}
}

#[cfg(feature = "newlib")]
pub static CONSOLE: InterruptTicketMutex<Console> = InterruptTicketMutex::new(Console(()));
#[cfg(not(feature = "newlib"))]
static CONSOLE: InterruptTicketMutex<Console> = InterruptTicketMutex::new(Console(()));

#[doc(hidden)]
pub fn _print(args: ::core::fmt::Arguments<'_>) {
use core::fmt::Write;
CONSOLE.lock().write_fmt(args).unwrap();
}

#[cfg(all(test, not(target_os = "none")))]
mod tests {
Expand Down
214 changes: 214 additions & 0 deletions src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,34 @@ use alloc::task::Wake;
use core::future::Future;
use core::sync::atomic::AtomicU32;
use core::task::{Context, Poll, Waker};
use core::time::Duration;

use crossbeam_utils::Backoff;
use hermit_sync::without_interrupts;
#[cfg(any(feature = "tcp", feature = "udp"))]
use smoltcp::time::Instant;

use crate::arch::core_local::*;
#[cfg(all(
any(feature = "tcp", feature = "udp"),
not(feature = "pci"),
not(feature = "newlib")
))]
use crate::drivers::mmio::get_network_driver;
#[cfg(all(any(feature = "tcp", feature = "udp"), not(feature = "newlib")))]
use crate::drivers::net::NetworkDriver;
#[cfg(all(
any(feature = "tcp", feature = "udp"),
feature = "pci",
not(feature = "newlib")
))]
use crate::drivers::pci::get_network_driver;
#[cfg(all(any(feature = "tcp", feature = "udp"), not(feature = "newlib")))]
use crate::executor::network::network_delay;
use crate::executor::task::AsyncTask;
use crate::fd::IoError;
#[cfg(all(any(feature = "tcp", feature = "udp"), not(feature = "newlib")))]
use crate::scheduler::PerCoreSchedulerExt;
use crate::synch::futex::*;

struct TaskNotify {
Expand Down Expand Up @@ -77,3 +100,194 @@ pub fn init() {
#[cfg(all(any(feature = "tcp", feature = "udp"), not(feature = "newlib")))]
crate::executor::network::init();
}

#[inline]
pub(crate) fn now() -> u64 {
crate::arch::kernel::systemtime::now_micros()
}

/// Blocks the current thread on `f`, running the executor when idling.
pub(crate) fn poll_on<F, T>(future: F, timeout: Option<Duration>) -> Result<T, IoError>
where
F: Future<Output = Result<T, IoError>>,
{
#[cfg(any(feature = "tcp", feature = "udp"))]
let nic = get_network_driver();

// disable network interrupts
#[cfg(any(feature = "tcp", feature = "udp"))]
let no_retransmission = if let Some(nic) = nic {
let mut guard = nic.lock();
guard.set_polling_mode(true);
guard.get_checksums().tcp.tx()
} else {
true
};

let start = now();
let waker = core::task::Waker::noop();
let mut cx = Context::from_waker(&waker);
let mut future = future;
let mut future = unsafe { core::pin::Pin::new_unchecked(&mut future) };

loop {
// run background tasks
run();

if let Poll::Ready(t) = future.as_mut().poll(&mut cx) {
#[cfg(any(feature = "tcp", feature = "udp"))]
if !no_retransmission {
let wakeup_time =
network_delay(Instant::from_micros_const(now().try_into().unwrap()))
.map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros());
core_scheduler().add_network_timer(wakeup_time);
}

// allow network interrupts
#[cfg(any(feature = "tcp", feature = "udp"))]
if let Some(nic) = nic {
nic.lock().set_polling_mode(false);
}

return t;
}

if let Some(duration) = timeout {
if Duration::from_micros(now() - start) >= duration {
#[cfg(any(feature = "tcp", feature = "udp"))]
if !no_retransmission {
let wakeup_time =
network_delay(Instant::from_micros_const(now().try_into().unwrap()))
.map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros());
core_scheduler().add_network_timer(wakeup_time);
}

// allow network interrupts
#[cfg(any(feature = "tcp", feature = "udp"))]
if let Some(nic) = nic {
nic.lock().set_polling_mode(false);
}

return Err(IoError::ETIME);
}
}
}
}

/// Blocks the current thread on `f`, running the executor when idling.
pub(crate) fn block_on<F, T>(future: F, timeout: Option<Duration>) -> Result<T, IoError>
where
F: Future<Output = Result<T, IoError>>,
{
#[cfg(any(feature = "tcp", feature = "udp"))]
let nic = get_network_driver();

// disable network interrupts
#[cfg(any(feature = "tcp", feature = "udp"))]
let no_retransmission = if let Some(nic) = nic {
let mut guard = nic.lock();
guard.set_polling_mode(true);
!guard.get_checksums().tcp.tx()
} else {
true
};

let backoff = Backoff::new();
let start = now();
let task_notify = Arc::new(TaskNotify::new());
let waker = task_notify.clone().into();
let mut cx = Context::from_waker(&waker);
let mut future = future;
let mut future = unsafe { core::pin::Pin::new_unchecked(&mut future) };

loop {
// run background tasks
run();

let now = now();
if let Poll::Ready(t) = future.as_mut().poll(&mut cx) {
#[cfg(any(feature = "tcp", feature = "udp"))]
if !no_retransmission {
let network_timer =
network_delay(Instant::from_micros_const(now.try_into().unwrap()))
.map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros());
core_scheduler().add_network_timer(network_timer);
}

// allow network interrupts
#[cfg(any(feature = "tcp", feature = "udp"))]
if let Some(nic) = nic {
nic.lock().set_polling_mode(false);
}

return t;
}

if let Some(duration) = timeout {
if Duration::from_micros(now - start) >= duration {
#[cfg(any(feature = "tcp", feature = "udp"))]
if !no_retransmission {
let network_timer =
network_delay(Instant::from_micros_const(now.try_into().unwrap()))
.map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros());
core_scheduler().add_network_timer(network_timer);
}

// allow network interrupts
#[cfg(any(feature = "tcp", feature = "udp"))]
if let Some(nic) = nic {
nic.lock().set_polling_mode(false);
}

return Err(IoError::ETIME);
}
}

#[cfg(any(feature = "tcp", feature = "udp"))]
{
let delay = network_delay(Instant::from_micros_const(now.try_into().unwrap()))
.map(|d| d.total_micros());

if backoff.is_completed() && delay.unwrap_or(10_000_000) > 10_000 {
let wakeup_time =
timeout.map(|duration| start + u64::try_from(duration.as_micros()).unwrap());
if !no_retransmission {
let ticks = crate::arch::processor::get_timer_ticks();
let network_timer = delay.map(|d| ticks + d);
core_scheduler().add_network_timer(network_timer);
}

// allow network interrupts
if let Some(nic) = nic {
nic.lock().set_polling_mode(false);
}

// switch to another task
task_notify.wait(wakeup_time);

// restore default values
if let Some(nic) = nic {
nic.lock().set_polling_mode(true);
}
backoff.reset();
} else {
backoff.snooze();
}
}
#[cfg(not(any(feature = "tcp", feature = "udp")))]
{
if backoff.is_completed() {
let wakeup_time =
timeout.map(|duration| start + u64::try_from(duration.as_micros()).unwrap());

// switch to another task
task_notify.wait(wakeup_time);

// restore default values
backoff.reset();
} else {
backoff.snooze();
}
}
}
}
Loading

0 comments on commit b271085

Please sign in to comment.