diff --git a/service/domain_worker.ml b/service/domain_worker.ml index dab3cdb..6df1f5c 100644 --- a/service/domain_worker.ml +++ b/service/domain_worker.ml @@ -11,7 +11,7 @@ type request = { cancelled : unit Eio.Promise.t option; } -type reply = ((OpamPackage.t list, string) result * float, [`Msg of string]) result +type reply = ((OpamPackage.t list, string) result * float, [`Msg of string | `Cancelled]) result let env (vars : Worker.Vars.t) v = match v with @@ -31,7 +31,7 @@ let env (vars : Worker.Vars.t) v = let solve { packages; root_pkgs; pinned_pkgs; vars; cancelled } = match cancelled with - | Some p when Promise.is_resolved p -> Error (`Msg "Cancelled") + | Some p when Promise.is_resolved p -> Error `Cancelled | _ -> try let pins = root_pkgs @ pinned_pkgs |> OpamPackage.Name.Map.of_list in diff --git a/service/domain_worker.mli b/service/domain_worker.mli index 9e51e24..502b38e 100644 --- a/service/domain_worker.mli +++ b/service/domain_worker.mli @@ -15,10 +15,11 @@ type request = { (** If resolved, the result is not needed. *) } -type reply = ((OpamPackage.t list, string) result * float, [`Msg of string]) result +type reply = ((OpamPackage.t list, string) result * float, [`Msg of string | `Cancelled]) result (** [Ok (Ok selection)] if there is a solution. [Ok (Error msg)] if there is no solution. - [Error msg] if the request was invalid. *) + [Error msg] if the request was invalid. + [Error Cancelled] if the request was cancelled before started. *) val env : Solver_service_api.Worker.Vars.t -> string -> OpamVariable.variable_contents option (** [env vars name] is the value of [name] in [vars]. *) diff --git a/service/pool.ml b/service/pool.ml index 6ba96ea..6f5c6d7 100644 --- a/service/pool.ml +++ b/service/pool.ml @@ -1,14 +1,16 @@ open Eio.Std -type ('request, 'reply) t = ('request * 'reply Promise.u) Eio.Stream.t +type ('request, 'reply) t = { + requests: ('request * 'reply Promise.u) Eio.Stream.t; running: int Atomic.t; n_workers: int +} let rec run_worker t handle = - let request, set_reply = Eio.Stream.take t in + let request, set_reply = Eio.Stream.take t.requests in handle request |> Promise.resolve set_reply; run_worker t handle let create ~sw ~domain_mgr ~n_workers handle = - let t = Eio.Stream.create 0 in + let t = { requests = Eio.Stream.create max_int; running = Atomic.make 0; n_workers } in for _i = 1 to n_workers do Fiber.fork_daemon ~sw (fun () -> Eio.Domain_manager.run domain_mgr (fun () -> run_worker t handle) @@ -18,5 +20,9 @@ let create ~sw ~domain_mgr ~n_workers handle = let use t request = let reply, set_reply = Promise.create () in - Eio.Stream.add t (request, set_reply); + Eio.Stream.add t.requests (request, set_reply); Promise.await reply + +let n_workers t = t.n_workers + +let wait_requests t = Eio.Stream.length t.requests diff --git a/service/solver.ml b/service/solver.ml index 160d9da..4a4ec85 100644 --- a/service/solver.ml +++ b/service/solver.ml @@ -12,6 +12,54 @@ type t = { let ocaml = OpamPackage.Name.of_string "ocaml" + +module Metrics = struct + open Prometheus + + let namespace = "ocluster" + let subsystem = "solver" + + let request_handling_total = + let help = "Total number of handled requests" in + Counter.v ~help ~namespace ~subsystem "requests_handled_total" + + let request_handling = + let help = "Number of handled requests by state" in + Gauge.v_label ~label_name:"state" ~help ~namespace ~subsystem "request_state" + + let update_request_handling pool n_requests = + let workers = Pool.n_workers pool in + let waiting = Pool.wait_requests pool in + let running = min !n_requests workers in + Gauge.set (request_handling "running") (float_of_int running); + Gauge.set (request_handling "waiting") (float_of_int waiting); + Gauge.set (request_handling "capacity") (float_of_int workers) + + let request_ok = + let help = "Total number of success requests" in + Counter.v ~help ~namespace ~subsystem "success" + + let request_fail = + let help = "Total number of fail requests" in + Counter.v ~help ~namespace ~subsystem "failed" + + let request_no_solution = + let help = "Total number of no solution requests " in + Counter.v ~help ~namespace ~subsystem "no_solution" + + let request_cancel_waiting = + let help = "Total number of cancel when the requests are waiting" in + Counter.v ~help ~namespace ~subsystem "cancel_waiting" + + let request_cancel_running = + let help = "Total number of cancel when the requests are running" in + Counter.v ~help ~namespace ~subsystem "cancel_running" + + let request_timing = + let help = "Total time spent finding solutions" in + Summary.v ~help ~namespace ~subsystem "timing" +end + (* If a local package has a literal constraint on OCaml's version and it doesn't match the platform, we just remove that package from the set to test, so other packages can still be tested. *) @@ -47,13 +95,16 @@ let solve_for_platform ?cancelled t ~log ~opam_repository_commits ~packages ~roo ) else ( let slice = { Domain_worker.vars; root_pkgs; packages; pinned_pkgs; cancelled } in match Pool.use t.pool slice with + | Error `Cancelled -> Error `Cancelled | Error (`Msg m) -> Error (`Msg m) | Ok (results, time) -> match results with | Error e -> + Prometheus.Summary.observe Metrics.request_timing time; Log.info log "%s: eliminated all possibilities in %.2f s" id time; Error (`No_solution e) | Ok packages -> + Prometheus.Summary.observe Metrics.request_timing time; Log.info log "%s: found solution in %.2f s" id time; let repo_packages = packages @@ -115,12 +166,18 @@ let solve ?cancelled t ~log request = in Log.info log "Solving for %a" Fmt.(list ~sep:comma string) root_pkgs; let serious_errors = ref [] in + let cancel_waiting = ref 0 in + let n_requests = ref 0 in + let pool = t.pool in let*! root_pkgs = parse_opams request.root_pkgs in let*! pinned_pkgs = parse_opams request.pinned_pkgs in let*! packages = Stores.packages t.stores opam_repository_commits in let results = platforms |> Fiber.List.map (fun (id, vars) -> + Prometheus.Counter.inc_one Metrics.request_handling_total; + incr n_requests; + Metrics.update_request_handling pool n_requests; let result = solve_for_platform t id ?cancelled @@ -132,12 +189,16 @@ let solve ?cancelled t ~log request = ~pins ~vars in + Metrics.update_request_handling pool n_requests; (id, result) ) |> List.filter_map (fun (id, result) -> Log.info log "= %s =" id; + decr n_requests; + Metrics.update_request_handling pool n_requests; match result with | Ok result -> + Prometheus.Counter.inc_one Metrics.request_ok; Log.info log "-> @[%a@]" Fmt.(list ~sep:sp string) result.Selection.packages; @@ -145,17 +206,27 @@ let solve ?cancelled t ~log request = Fmt.(list ~sep:semi (pair ~sep:comma string string)) result.Selection.commits; Some result + | Error `Cancelled -> + Prometheus.Counter.inc_one Metrics.request_cancel_waiting; + incr cancel_waiting; + Log.info log "%s" "Cancelled"; + None | Error (`No_solution msg) -> + Prometheus.Counter.inc_one Metrics.request_no_solution; Log.info log "%s" msg; None | Error (`Msg msg) -> + Prometheus.Counter.inc_one Metrics.request_fail; Log.info log "%s" msg; serious_errors := msg :: !serious_errors; None ) in match cancelled with - | Some p when Promise.is_resolved p -> Error `Cancelled + | Some p when Promise.is_resolved p -> + let cancels = (List.length platforms) - (!cancel_waiting) in + Prometheus.Counter.inc Metrics.request_cancel_running (float_of_int cancels); + Error `Cancelled | _ -> match !serious_errors with | [] -> Ok results