Fiber: add map-reduce helpers#14503
Conversation
Add map_reduce_seq, map_reduce_array, and map_reduce for running fibers concurrently over sequences, arrays, and lists, then combining their results with an explicit ~combine function. Expose the helpers through Memo, cover each collection shape with expect tests, and migrate order-insensitive reductions over dependency sets, path sets, and cached table entries to the new helpers. Signed-off-by: Rudi Grinberg <me@rgrinberg.com>
4371d68 to
e3ba948
Compare
| val map_reduce_seq | ||
| : 'a Seq.t | ||
| -> f:('a -> 'b t) | ||
| -> empty:'b |
There was a problem hiding this comment.
Wondering if init is a better name, since in a few call sites the empty has been non-empty.
| : 'a Seq.t | ||
| -> f:('a -> 'b t) | ||
| -> empty:'b | ||
| -> combine:('b -> 'b -> 'b) |
There was a problem hiding this comment.
In many of the call sites, the fibers were mapped with an order and then reduced. Now IIUC we are ordering by who finished first. This seems fine for the applications, but it means that combine should be associative and commutative for this to work, as Map.union is. Might be worth adding a comment.
There was a problem hiding this comment.
If not in fiber, then in memo it might make more sense to mention.
|
Here is a test you can add that asserts completion order: let%expect_test "map_reduce combines in fiber completion order" =
let test =
let iv1 = Fiber.Ivar.create () in
let iv2 = Fiber.Ivar.create () in
let iv3 = Fiber.Ivar.create () in
Fiber.fork_and_join_unit
(fun () ->
let+ res =
Fiber.map_reduce
[ 1, iv1; 2, iv2; 3, iv3 ]
~f:(fun (n, ivar) ->
let+ () = Fiber.Ivar.read ivar in
printfn "completed: %d" n;
[ n ])
~empty:[]
~combine:List.append
in
printfn "result: [%s]" (String.concat ~sep:"; " (List.map res ~f:string_of_int)))
(fun () ->
let* () = Fiber.Ivar.fill iv3 () in
let* () = Fiber.Ivar.fill iv2 () in
Fiber.Ivar.fill iv1 ())
in
Scheduler.run test;
[%expect
{|
completed: 3
completed: 2
completed: 1
result: [3; 2; 1] |}]
;; |
| | [] -> k empty | ||
| | x :: l -> | ||
| let current = ref empty in | ||
| let running = ref (List.length l + 1) in |
There was a problem hiding this comment.
This does two passes over the list. Do you think there is something we can do with nfork similar to the seq variant where we can avoid the double pass and just increment as we traverse? I didn't think too hard about it tho, and it probably isn't a huge cost in practice.
Alizter
left a comment
There was a problem hiding this comment.
I have some comments, but nothing blocking. LGTM
Add map_reduce_seq, map_reduce_array, and map_reduce for running fibers concurrently over sequences, arrays, and lists, then combining their results with an explicit ~combine function.
Expose the helpers through Memo, cover each collection shape with expect tests, and migrate order-insensitive reductions over dependency sets, path sets, and cached table entries to the new helpers.