@@ -144,6 +144,21 @@ module Registry = struct
144144 Hashtbl. clear combinators;
145145 dirty_nodes := []
146146
147+ let reset_stats () =
148+ Hashtbl. iter
149+ (fun _ info ->
150+ info.stats.deltas_received < - 0 ;
151+ info.stats.entries_received < - 0 ;
152+ info.stats.adds_received < - 0 ;
153+ info.stats.removes_received < - 0 ;
154+ info.stats.process_count < - 0 ;
155+ info.stats.process_time_ns < - 0L ;
156+ info.stats.deltas_emitted < - 0 ;
157+ info.stats.entries_emitted < - 0 ;
158+ info.stats.adds_emitted < - 0 ;
159+ info.stats.removes_emitted < - 0 )
160+ nodes
161+
147162 (* * Generate Mermaid diagram of the pipeline *)
148163 let to_mermaid () =
149164 let buf = Buffer. create 256 in
@@ -460,8 +475,8 @@ let flatMap ~name (src : ('k1, 'v1) t) ~f ?merge () : ('k2, 'v2) t =
460475 Hashtbl. remove provenance k1;
461476 List. iter
462477 (fun k2 ->
463- match Hashtbl. find_opt contributions k2 with
464- | None -> ()
478+ match Hashtbl. find_opt contributions k2 with
479+ | None -> ()
465480 | Some contribs -> Hashtbl. remove contribs k1)
466481 target_keys;
467482 target_keys
@@ -472,37 +487,37 @@ let flatMap ~name (src : ('k1, 'v1) t) ~f ?merge () : ('k2, 'v2) t =
472487 Hashtbl. replace provenance k1 target_keys;
473488 List. iter
474489 (fun (k2 , v2 ) ->
475- let contribs =
476- match Hashtbl. find_opt contributions k2 with
477- | Some c -> c
478- | None ->
479- let c = Hashtbl. create 4 in
480- Hashtbl. replace contributions k2 c;
481- c
482- in
490+ let contribs =
491+ match Hashtbl. find_opt contributions k2 with
492+ | Some c -> c
493+ | None ->
494+ let c = Hashtbl. create 4 in
495+ Hashtbl. replace contributions k2 c;
496+ c
497+ in
483498 Hashtbl. replace contribs k1 v2)
484499 entries;
485500 target_keys
486501 in
487502
488503 let process_entry (k1 , v1_opt ) =
489- let old_affected = remove_source k1 in
504+ let old_affected = remove_source k1 in
490505 let new_affected =
491506 match v1_opt with
492507 | None -> []
493508 | Some v1 ->
494509 let entries = f k1 v1 in
495510 add_source k1 entries
496511 in
497- let all_affected = old_affected @ new_affected in
512+ let all_affected = old_affected @ new_affected in
498513 (* Deduplicate *)
499- let seen = Hashtbl. create (List. length all_affected) in
514+ let seen = Hashtbl. create (List. length all_affected) in
500515 List. filter_map
501516 (fun k2 ->
502- if Hashtbl. mem seen k2 then None
503- else (
504- Hashtbl. replace seen k2 () ;
505- recompute_target k2))
517+ if Hashtbl. mem seen k2 then None
518+ else (
519+ Hashtbl. replace seen k2 () ;
520+ recompute_target k2))
506521 all_affected
507522 in
508523
@@ -626,8 +641,8 @@ let join ~name (left : ('k1, 'v1) t) (right : ('k2, 'v2) t) ~key_of ~f ?merge ()
626641 Hashtbl. remove provenance k1;
627642 List. iter
628643 (fun k3 ->
629- match Hashtbl. find_opt contributions k3 with
630- | None -> ()
644+ match Hashtbl. find_opt contributions k3 with
645+ | None -> ()
631646 | Some contribs -> Hashtbl. remove contribs k1)
632647 target_keys;
633648 target_keys
@@ -638,14 +653,14 @@ let join ~name (left : ('k1, 'v1) t) (right : ('k2, 'v2) t) ~key_of ~f ?merge ()
638653 Hashtbl. replace provenance k1 target_keys;
639654 List. iter
640655 (fun (k3 , v3 ) ->
641- let contribs =
642- match Hashtbl. find_opt contributions k3 with
643- | Some c -> c
644- | None ->
645- let c = Hashtbl. create 4 in
646- Hashtbl. replace contributions k3 c;
647- c
648- in
656+ let contribs =
657+ match Hashtbl. find_opt contributions k3 with
658+ | Some c -> c
659+ | None ->
660+ let c = Hashtbl. create 4 in
661+ Hashtbl. replace contributions k3 c;
662+ c
663+ in
649664 Hashtbl. replace contribs k1 v3)
650665 entries;
651666 target_keys
@@ -742,7 +757,7 @@ let join ~name (left : ('k1, 'v1) t) (right : ('k2, 'v2) t) ~key_of ~f ?merge ()
742757 | Some left_keys ->
743758 List. iter
744759 (fun k1 ->
745- match Hashtbl. find_opt left_entries k1 with
760+ match Hashtbl. find_opt left_entries k1 with
746761 | Some v1 ->
747762 let affected = process_left_entry k1 v1 in
748763 all_affected := affected @ ! all_affected
@@ -975,7 +990,7 @@ let fixpoint ~name ~(init : ('k, unit) t) ~(edges : ('k, 'k list) t) () :
975990 let frontier = Queue. create () in
976991
977992 (* Start from all roots *)
978- Hashtbl. iter
993+ Hashtbl. iter
979994 (fun k () ->
980995 Hashtbl. replace new_current k () ;
981996 Queue. add k frontier)
@@ -985,9 +1000,9 @@ let fixpoint ~name ~(init : ('k, unit) t) ~(edges : ('k, 'k list) t) () :
9851000 while not (Queue. is_empty frontier) do
9861001 let k = Queue. pop frontier in
9871002 match Hashtbl. find_opt edge_map k with
988- | None -> ()
1003+ | None -> ()
9891004 | Some successors ->
990- List. iter
1005+ List. iter
9911006 (fun succ ->
9921007 if not (Hashtbl. mem new_current succ) then (
9931008 Hashtbl. replace new_current succ () ;
@@ -1025,7 +1040,7 @@ let fixpoint ~name ~(init : ('k, unit) t) ~(edges : ('k, 'k list) t) () :
10251040 let needs_full_recompute = ref false in
10261041
10271042 (* Apply edge updates *)
1028- List. iter
1043+ List. iter
10291044 (fun (k , v_opt ) ->
10301045 match v_opt with
10311046 | Some successors ->
@@ -1041,7 +1056,7 @@ let fixpoint ~name ~(init : ('k, unit) t) ~(edges : ('k, 'k list) t) () :
10411056 edges_entries;
10421057
10431058 (* Apply init updates *)
1044- List. iter
1059+ List. iter
10451060 (fun (k , v_opt ) ->
10461061 match v_opt with
10471062 | Some () -> Hashtbl. replace roots k ()
@@ -1057,7 +1072,7 @@ let fixpoint ~name ~(init : ('k, unit) t) ~(edges : ('k, 'k list) t) () :
10571072 let new_current = recompute_all () in
10581073
10591074 (* Find removed entries *)
1060- Hashtbl. iter
1075+ Hashtbl. iter
10611076 (fun k () ->
10621077 if not (Hashtbl. mem new_current k) then
10631078 output_entries := (k, None ) :: ! output_entries)
@@ -1091,7 +1106,7 @@ let fixpoint ~name ~(init : ('k, unit) t) ~(edges : ('k, 'k list) t) () :
10911106 match Hashtbl. find_opt edge_map k with
10921107 | None -> ()
10931108 | Some successors ->
1094- List. iter
1109+ List. iter
10951110 (fun succ ->
10961111 if not (Hashtbl. mem current succ) then (
10971112 Hashtbl. replace current succ () ;
@@ -1167,3 +1182,4 @@ let fixpoint ~name ~(init : ('k, unit) t) ~(edges : ('k, 'k list) t) () :
11671182let to_mermaid () = Registry. to_mermaid ()
11681183let print_stats () = Registry. print_stats ()
11691184let reset () = Registry. clear ()
1185+ let reset_stats () = Registry. reset_stats ()
0 commit comments