Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async delete #243

Merged
merged 1 commit into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 8 additions & 17 deletions bin/worker.ml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ let update_docker () =
let update_normal () =
Lwt.return (fun () -> Lwt.return_unit)

let main ?style_renderer level ?formatter registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_item_threshold obuilder_prune_limit state_dir obuilder additional_metrics =
let main ?style_renderer level ?formatter registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_limit state_dir obuilder additional_metrics =
setup_log ?style_renderer ?formatter level;
let update =
if Sys.file_exists "/.dockerenv" then update_docker
Expand All @@ -57,16 +57,16 @@ let main ?style_renderer level ?formatter registration_path capacity name allow_
Lwt_main.run begin
let vat = Capnp_rpc_unix.client_only_vat () in
let sr = Capnp_rpc_unix.Cap_file.load vat registration_path |> or_die in
Cluster_worker.run ~capacity ~name ~allow_push ~healthcheck_period ?prune_threshold ?docker_max_df_size ?obuilder_prune_threshold ?obuilder_prune_item_threshold ~obuilder_prune_limit ?obuilder ~additional_metrics ~state_dir ~update sr
Cluster_worker.run ~capacity ~name ~allow_push ~healthcheck_period ?prune_threshold ?docker_max_df_size ~obuilder_prune_threshold ~obuilder_prune_limit ?obuilder ~additional_metrics ~state_dir ~update sr
end

(* Command-line parsing *)
let main ~install (style_renderer, args1) (level, args2) ((registration_path, capacity, name, allow_push, healthcheck_period, prune_threshold, docker_max_df_size, obuilder_prune_threshold, obuilder_prune_item_threshold, obuilder_prune_limit, state_dir, obuilder, additional_metrics), args3) =
let main ~install (style_renderer, args1) (level, args2) ((registration_path, capacity, name, allow_push, healthcheck_period, prune_threshold, docker_max_df_size, obuilder_prune_threshold, obuilder_prune_limit, state_dir, obuilder, additional_metrics), args3) =
if install then
Ok (Winsvc_wrapper.install name "OCluster Worker" "Run a build worker" (args1 @ args2 @ args3))
else
Ok (Winsvc_wrapper.run name state_dir (fun ?formatter () ->
main ?style_renderer level ?formatter registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_item_threshold obuilder_prune_limit state_dir obuilder additional_metrics))
main ?style_renderer level ?formatter registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_limit state_dir obuilder additional_metrics))


open Cmdliner
Expand Down Expand Up @@ -123,22 +123,13 @@ let docker_max_df_size =

let obuilder_prune_threshold =
Arg.value @@
Arg.opt Arg.(some float) None @@
Arg.opt Arg.float 30.0 @@
Arg.info
~doc:"If using OBuilder, this threshold is used to prune the stored builds if the free space falls below this (0-100)."
~docv:"PERCENTAGE"
~docs:"OBUILDER"
["obuilder-prune-threshold"]

let obuilder_prune_item_threshold =
Arg.value @@
Arg.opt Arg.(some int64) None @@
Arg.info
~doc:"If using OBuilder, this threshold is used to prune the stored builds if the number of cached steps exceeds this value."
~docv:"ITEMS"
~docs:"OBUILDER"
["obuilder-prune-item-threshold"]

let obuilder_prune_limit =
Arg.value @@
Arg.opt Arg.int 100 @@
Expand Down Expand Up @@ -202,11 +193,11 @@ module Obuilder_config = struct
end

let worker_opts_t =
let worker_opts registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_item_threshold obuilder_prune_limit state_dir obuilder additional_metrics =
(registration_path, capacity, name, allow_push, healthcheck_period, prune_threshold, docker_max_df_size, obuilder_prune_threshold, obuilder_prune_item_threshold, obuilder_prune_limit, state_dir, obuilder, additional_metrics) in
let worker_opts registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_limit state_dir obuilder additional_metrics =
(registration_path, capacity, name, allow_push, healthcheck_period, prune_threshold, docker_max_df_size, obuilder_prune_threshold, obuilder_prune_limit, state_dir, obuilder, additional_metrics) in
Term.(with_used_args
(const worker_opts $ connect_addr $ capacity $ worker_name $ allow_push $ healthcheck_period
$ prune_threshold $ docker_max_df_size $ obuilder_prune_threshold $ obuilder_prune_item_threshold $ obuilder_prune_limit $ state_dir $ Obuilder_config.v $ additional_metrics))
$ prune_threshold $ docker_max_df_size $ obuilder_prune_threshold $ obuilder_prune_limit $ state_dir $ Obuilder_config.v $ additional_metrics))

let cmd ~install =
let doc = "Run a build worker" in
Expand Down
2 changes: 1 addition & 1 deletion obuilder
4 changes: 2 additions & 2 deletions test/mock_builder.ml
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ let update () =
Lwt.return (fun () -> failwith "Mock restart")

let run ?(capacity=1) ?(name="worker-1") ~switch t registration_service =
let thread = Cluster_worker.run ~switch ~capacity ~name ~build:(build t) ~healthcheck_period:600.0 ~update ~state_dir registration_service in
let thread = Cluster_worker.run ~switch ~capacity ~name ~build:(build t) ~update ~state_dir registration_service in
Lwt.on_failure thread
(fun ex -> if Lwt_switch.is_on switch then raise ex)

let run_remote ~builder_switch ~network_switch ?(capacity=1) ?(name="worker-1") t registration_service =
let thread =
let registration_service = Mock_network.remote ~switch:network_switch registration_service in
Cluster_worker.run ~switch:builder_switch ~capacity ~name ~build:(build t) ~healthcheck_period:600.0 ~update ~state_dir registration_service
Cluster_worker.run ~switch:builder_switch ~capacity ~name ~build:(build t) ~update ~state_dir registration_service
in
Lwt.on_failure thread
(fun ex ->
Expand Down
63 changes: 15 additions & 48 deletions worker/cluster_worker.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,6 @@ open Capnp_rpc_lwt
module Log_data = Log_data
module Process = Process

module Metrics = struct
open Prometheus

let namespace = "ocluster"
let subsystem = "worker"

let jobs_accepted =
let help = "Number of jobs accepted in total" in
Counter.v ~help ~namespace ~subsystem "jobs_accepted_total"

let job_time =
let help = "Time jobs ran for" in
Summary.v_label ~label_name:"result" ~help ~namespace ~subsystem "job_time_seconds"

let docker_push_time =
let help = "Time uploading to Docker Hub" in
Summary.v ~help ~namespace ~subsystem "docker_push_time_seconds"

let docker_prune_time =
let help = "Time spent pruning Docker cache" in
Summary.v ~help ~namespace ~subsystem "docker_prune_time_seconds"

let running_jobs =
let help = "Number of jobs currently running" in
Gauge.v ~help ~namespace ~subsystem "running_jobs"

let healthcheck_time =
let help = "Time to perform last healthcheck" in
Gauge.v ~help ~namespace ~subsystem "healthcheck_time_seconds"

let unhealthy =
let help = "Number of unhealthy workers" in
Gauge.v ~help ~namespace ~subsystem "unhealthy"

let cache_hits =
let help = "Number of OBuilder cache hits" in
Gauge.v ~help ~namespace ~subsystem "cache_hits"

let cache_misses =
let help = "Number of OBuilder cache misses" in
Gauge.v ~help ~namespace ~subsystem "cache_misses"
end

let buildkit_env =
let orig = Unix.environment () |> Array.to_list in
"DOCKER_BUILDKIT=1" :: orig |> Array.of_list
Expand Down Expand Up @@ -310,15 +267,24 @@ let loop ~switch ?obuilder t queue =
check_health t ~last_healthcheck ~queue obuilder >>= fun () ->
let outcome, set_outcome = Lwt.wait () in
let log = Log_data.create () in
Log.info (fun f -> f "Requesting a new job…");
Log.info (fun f -> f "Requesting a new job… (%i running)" t.in_use);
let switch = Lwt_switch.create () in
let pop =
Capability.with_ref (Cluster_api.Job.local ~switch ~outcome ~stream_log_data:(Log_data.stream log)) @@ fun job ->
Cluster_api.Queue.pop queue job
in
t.cancel <- (fun () -> Lwt.cancel pop);
pop >>= fun request ->
t.in_use <- t.in_use + 1;
let module R = Cluster_api.Raw.Reader.JobDescr in
let cache_hint = R.cache_hint_get request in
let weights = [
(Str.regexp {|.*tezos.*|}, 2);
(Str.regexp {|.*octez.*|}, 3);
] in
let weight = List.fold_left (fun a (re, w) -> if Str.string_match re cache_hint 0 then w else a) 1 weights in
t.in_use <- t.in_use + weight;
Log.info (fun f -> f "Cache_hint %s" cache_hint);
Log.info (fun f -> f "Job weight %i" weight);
Prometheus.Gauge.set Metrics.running_jobs (float_of_int t.in_use);
Prometheus.Counter.inc_one Metrics.jobs_accepted;
Lwt.async (fun () ->
Expand All @@ -333,6 +299,7 @@ let loop ~switch ?obuilder t queue =
(fun (outcome, metric_label) ->
let t1 = Unix.gettimeofday () in
Prometheus.Summary.observe (Metrics.job_time metric_label) (t1 -. t0);
Log.info (fun f -> f "Build duration: %fs" (t1 -. t0));
Log_data.close log;
Lwt.wakeup set_outcome outcome;
Lwt.return_unit)
Expand All @@ -346,7 +313,7 @@ let loop ~switch ?obuilder t queue =
Lwt.return_unit)
)
(fun () ->
t.in_use <- t.in_use - 1;
t.in_use <- t.in_use - weight;
Prometheus.Gauge.set Metrics.running_jobs (float_of_int t.in_use);
let h, m = cache_stats obuilder in
Prometheus.Gauge.set Metrics.cache_hits (float_of_int h);
Expand Down Expand Up @@ -501,7 +468,7 @@ let self_update ~update t =
Lwt_result.fail (`Msg (Printexc.to_string ex))
)

let run ?switch ?build ?(allow_push=[]) ~healthcheck_period ?prune_threshold ?docker_max_df_size ?obuilder_prune_threshold ?obuilder_prune_item_threshold ?obuilder_prune_limit ?obuilder ?(additional_metrics=[]) ~update ~capacity ~name ~state_dir registration_service =
let run ?switch ?build ?(allow_push=[]) ?(healthcheck_period = 600.0) ?prune_threshold ?docker_max_df_size ?(obuilder_prune_threshold = 30.0) ?(obuilder_prune_limit = 100) ?obuilder ?(additional_metrics=[]) ~update ~capacity ~name ~state_dir registration_service =
begin match prune_threshold, docker_max_df_size with
| None, None -> Log.info (fun f -> f "Prune threshold not set and docker max df size is not. Will not check for low disk-space!")
| None, Some size -> Log.info (fun f -> f "Pruning docker whenever the memory used exceeds %3.2fGB" size)
Expand All @@ -510,7 +477,7 @@ let run ?switch ?build ?(allow_push=[]) ~healthcheck_period ?prune_threshold ?do
end;
begin match obuilder with
| None -> Lwt.return_none
| Some config -> Obuilder_build.create ?prune_threshold:obuilder_prune_threshold ?prune_item_threshold:obuilder_prune_item_threshold ?prune_limit:obuilder_prune_limit config >|= Option.some
| Some config -> Obuilder_build.create ~prune_threshold:obuilder_prune_threshold ~prune_limit:obuilder_prune_limit config >|= Option.some
end >>= fun obuilder ->
let build =
match build with
Expand Down
3 changes: 1 addition & 2 deletions worker/cluster_worker.mli
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ val run :
?switch:Lwt_switch.t ->
?build:build ->
?allow_push:string list ->
healthcheck_period:float ->
?healthcheck_period:float ->
?prune_threshold:float ->
?docker_max_df_size:float ->
?obuilder_prune_threshold:float ->
?obuilder_prune_item_threshold:int64 ->
?obuilder_prune_limit:int ->
?obuilder:Obuilder_config.t ->
?additional_metrics:(string * Uri.t) list ->
Expand Down
2 changes: 1 addition & 1 deletion worker/dune
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
(library
(name cluster_worker)
(public_name ocluster-worker)
(libraries ocluster-api digestif fpath logs capnp-rpc-lwt lwt.unix prometheus-app cohttp-lwt-unix obuilder extunix))
(libraries ocluster-api digestif fpath logs capnp-rpc-lwt lwt.unix prometheus-app cohttp-lwt-unix obuilder extunix str))
45 changes: 45 additions & 0 deletions worker/metrics.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
open Prometheus

let namespace = "ocluster"
let subsystem = "worker"

let jobs_accepted =
let help = "Number of jobs accepted in total" in
Counter.v ~help ~namespace ~subsystem "jobs_accepted_total"

let job_time =
let help = "Time jobs ran for" in
Summary.v_label ~label_name:"result" ~help ~namespace ~subsystem "job_time_seconds"

let docker_push_time =
let help = "Time uploading to Docker Hub" in
Summary.v ~help ~namespace ~subsystem "docker_push_time_seconds"

let docker_prune_time =
let help = "Time spent pruning Docker cache" in
Summary.v ~help ~namespace ~subsystem "docker_prune_time_seconds"

let running_jobs =
let help = "Number of jobs currently running" in
Gauge.v ~help ~namespace ~subsystem "running_jobs"

let healthcheck_time =
let help = "Time to perform last healthcheck" in
Gauge.v ~help ~namespace ~subsystem "healthcheck_time_seconds"

let unhealthy =
let help = "Number of unhealthy workers" in
Gauge.v ~help ~namespace ~subsystem "unhealthy"

let cache_hits =
let help = "Number of OBuilder cache hits" in
Gauge.v ~help ~namespace ~subsystem "cache_hits"

let cache_misses =
let help = "Number of OBuilder cache misses" in
Gauge.v ~help ~namespace ~subsystem "cache_misses"

let obuilder_space_free =
let help = "OBuilder percentage of space free" in
Gauge.v ~help ~namespace ~subsystem "obuilder_space_free"

98 changes: 25 additions & 73 deletions worker/obuilder_build.ml
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
open Lwt.Infix

let prune_margin = 600.0 (* Don't prune anything used less than 10 minutes ago *)

type builder = Builder : (module Obuilder.BUILDER with type t = 'a) * 'a -> builder

module Config = struct
Expand All @@ -18,9 +16,6 @@ type t = {
builder : builder;
mutable pruning : bool;
cond : unit Lwt_condition.t; (* Fires when we finish pruning *)
prune_threshold : float option;
prune_item_threshold : int64 option; (* Threshold number of items to hold in obuilder store *)
prune_limit : int option; (* Number of items to prune from obuilder when threshold is reached *)
}

let ( / ) = Filename.concat
Expand All @@ -37,7 +32,7 @@ let log_to log_data tag msg =
| `Note -> Log_data.info log_data "\027[01;2m\027[01;35m%a %s\027[0m" pp_timestamp (Unix.gettimeofday ()) msg
| `Output -> Log_data.write log_data msg

let create ?prune_threshold ?prune_item_threshold ?prune_limit config =
let create ?(prune_threshold = 30.0) ?(prune_limit = 100) config =
let { Config.store; sandbox_config } = config in
store >>= fun (Obuilder.Store_spec.Store ((module Store), store)) ->
begin match sandbox_config with
Expand All @@ -58,79 +53,36 @@ let create ?prune_threshold ?prune_item_threshold ?prune_limit config =
| Error (`Msg m) -> Fmt.failwith "Initial OBuilder healthcheck failed: %s" m
| Ok () ->
Log.info (fun f -> f "OBuilder self-test passed");
let r =
{
builder = Builder ((module Builder), builder);
pruning = false;
prune_threshold;
prune_item_threshold;
prune_limit;
cond = Lwt_condition.create ();
}

(* Prune [t] until free space rises above [prune_threshold]
or number of items falls below [prune_item_threshold]. *)
let do_prune ~prune_threshold ~prune_item_threshold ~prune_limit t =
let Builder ((module Builder), builder) = t.builder in
let rec aux () =
let stop = Unix.gettimeofday () -. prune_margin |> Unix.gmtime in
Builder.prune builder ~before:stop prune_limit >>= fun n ->
Builder.df builder >>= fun free ->
let count = Builder.count builder in
Log.info (fun f -> f "OBuilder partition: %.0f%% free, %Li items after pruning %d items" free count n);
if free > prune_threshold && count < prune_item_threshold
then Lwt.return_unit (* Space problem is fixed! *)
else if n < prune_limit then (
Log.warn (fun f -> f "Out of space, but nothing left to prune! (will wait and then retry)");
Lwt_unix.sleep 600.0 >>= aux
) else (
(* Continue pruning *)
aux ()
)
in
aux ()

(* Check the free space and/or number of items in [t]'s store.
If less than [t.prune_threshold] or items > [t.prune_item_threshold], spawn a prune operation (if not already running).
If less than half that is remaining, also wait for it to finish.
Returns once there is enough free space to proceed. *)
let check_free_space t =
let prune_limit = Option.value t.prune_limit ~default:100 in
let prune_threshold = Option.value t.prune_threshold ~default:0. in
let prune_item_threshold = Option.value t.prune_item_threshold ~default:Int64.max_int in
if prune_threshold = 0. && prune_item_threshold = Int64.max_int then
Lwt.return_unit (* No limits have been set *)
else
let Builder ((module Builder), builder) = t.builder in
let rec aux () =
Builder.df builder >>= fun free ->
let count = Builder.count builder in
Log.info (fun f -> f "OBuilder partition: %.0f%% free, %Li items" free count);
(* If we're low on space, or over the threshold number of items spawn a pruning thread. *)
if ((prune_threshold > 0. && free < prune_threshold) ||
(prune_item_threshold < Int64.max_int && count > prune_item_threshold)) && not t.pruning then (
t.pruning <- true;
Lwt.async (fun () ->
Lwt.finalize
(fun () -> do_prune ~prune_threshold ~prune_item_threshold ~prune_limit t)
(fun () ->
Lwt.pause () >|= fun () ->
t.pruning <- false;
Lwt_condition.broadcast t.cond ()
)
);
);
if free < prune_threshold /. 2.0 then (
assert (t.pruning);
Log.info (fun f -> f "OBuilder space very low. Waiting for prune to finish…");
Lwt_condition.wait t.cond >>= aux
) else (
Lwt.return_unit
)
in
aux ()
} in
Lwt.async (fun () ->
let rec loop () =
Builder.df builder >>= fun free ->
let count = Builder.count builder in
Log.info (fun f -> f "OBuilder partition: %.0f%% free, %Li items" free count);
Prometheus.Gauge.set Metrics.obuilder_space_free free;
if free > prune_threshold then (
r.pruning <- false;
Lwt_condition.signal r.cond (); (* release one waiting process *)
Lwt_unix.sleep 30.0 >>= fun () -> loop ()
) else (
r.pruning <- true;
let stop = Unix.gettimeofday () |> Unix.gmtime in
Builder.prune builder ~before:stop prune_limit >>= fun n ->
Log.info (fun f -> f "Pruned %i items" n);
(if n = 0 then Lwt_unix.sleep 30.0
else Lwt.return_unit )>>= fun () -> loop ()
)
in loop ()
); r

let build t ~switch ~log ~spec ~src_dir ~secrets =
check_free_space t >>= fun () ->
(if t.pruning then Lwt_condition.wait t.cond
else Lwt.return ()) >>= fun () ->
let log = log_to log in
let context = Obuilder.Context.v ~switch ~log ~src_dir ~secrets () in
let Builder ((module Builder), builder) = t.builder in
Expand Down
Loading
Loading