Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read multiple bytes for system events #10

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions lib/sel.ml
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,35 @@ type ('a,'b) ev_checker =
let file_descriptors_of l =
Sorted.map_filter (function { WithAttributes.it = ReadInProgress(fd,_); _ } -> Some fd | _ -> None) l

let filter_file_descriptor fds = function
| { WithAttributes.it = ReadInProgress(fd,_); _ } -> List.mem fd fds
| _ -> false

(* For fairness reasons, even if there are immediately ready events we
give a shot to system events with 0 wait, otherwise we wait until a
system event is ready. We never sleep forever, since process death events
do not wakeup select: we anyway wake up 10 times per second *)
let check_for_system_events : ('a system_event,'a) ev_checker = fun waiting ->
(* After advancing once, check if any waiting tasks have lower priority (more important) than the lowest
ready queue, task or system event. If so, try to advance the system task event.
The result is that it when reading 'n' bytes, it is no longer necessary to interleave up to 'n' ready tasks.
*)
let check_for_system_events min_prio_task_queue : ('a system_event,'a) ev_checker = fun waiting ->
let rec check_for_system_events new_ready waiting_skipped min_prio waiting =
let fds = file_descriptors_of waiting in
let ready_fds, _, _ = Unix.select fds [] [] 0.0 in
let new_ready, waiting, min_prio = pull_ready ~advance:advance_system ready_fds waiting in
new_ready, waiting, min_prio

let new_ready_1, waiting, min_prio_1 = pull_ready ~advance:advance_system ready_fds waiting in
let new_ready = Sorted.append new_ready_1 new_ready in
let min_prio = Sorted.min_priority min_prio_1 min_prio in
if ready_fds = [] then
new_ready, Sorted.append waiting waiting_skipped, min_prio
else
let waiting, waiting_skipped_1 = Sorted.partition (filter_file_descriptor ready_fds) waiting in
let waiting, waiting_skipped_2 = Sorted.partition_priority (Sorted.lt_priority min_prio) waiting in
let waiting_skipped = Sorted.concat [waiting_skipped_2; waiting_skipped_1; waiting_skipped] in
check_for_system_events new_ready waiting_skipped min_prio waiting
in
check_for_system_events Sorted.nil Sorted.nil min_prio_task_queue waiting

let check_for_queue_events : ('a queue_event,'a) ev_checker =
fun waiting ->
let new_ready, waiting, min_prio = pull_ready ~advance:advance_queue () waiting in
Expand Down Expand Up @@ -163,8 +182,10 @@ let wait ?(deadline=max_float) todo : 'a WithAttributes.t list * 'a Todo.t =
if is_empty todo then
[], todo
else
let ready_sys, waiting_sys, min_prio_sys = check_for_system_events system in
let min_prio, ready = Sorted.min ready in
let ready_queue, waiting_queue, min_prio_queue = check_for_queue_events queue in
let min_prio = Sorted.min_priority min_prio min_prio_queue in
let ready_sys, waiting_sys, min_prio_sys = check_for_system_events min_prio system in
if Sorted.is_nil ready_sys &&
Sorted.is_nil ready_queue &&
Sorted.is_nil ready &&
Expand All @@ -178,9 +199,7 @@ let wait ?(deadline=max_float) todo : 'a WithAttributes.t list * 'a Todo.t =
let ready = Sorted.to_list (Sorted.append ready_sys ready_queue) in
ready, { system = waiting_sys; queue = waiting_queue; tasks; ready = postponed }
else
let min_prio, ready = Sorted.min ready in
let min_prio = Sorted.min_priority min_prio min_prio_sys in
let min_prio = Sorted.min_priority min_prio min_prio_queue in
let ready_old, postponed_ready = pull_tasks min_prio ready in
let ready_tasks, tasks = pull_tasks min_prio tasks in
let ready_sys, postponed_sys = postpone min_prio ready_sys in
Expand Down
10 changes: 10 additions & 0 deletions lib/sorted.ml
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,13 @@ let partition f { sorted; data } =
in
aux [] [] data

let partition_priority f { sorted; data } =
let rev = if sorted then List.rev else fun x -> x in
let rec aux yes no = function
| [] -> { sorted; data = rev yes }, { sorted; data = rev no }
| x :: xs ->
if f (snd x)
then aux (x :: yes) no xs
else aux yes (x :: no) xs
in
aux [] [] data
1 change: 1 addition & 0 deletions lib/sorted.mli
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ val to_list : 'a t -> 'a list
val of_list : ('a * priority) list -> 'a t
val pp : (Format.formatter -> 'a -> unit) -> Format.formatter -> 'a t -> unit
val partition : ('a -> bool) -> 'a t -> 'a t * 'a t
val partition_priority : (priority -> bool) -> 'a t -> 'a t * 'a t