Skip to content

Commit

Permalink
Merge pull request #335 from terrateamio/334-fix-latency-on-repos-wit…
Browse files Browse the repository at this point in the history
…h-large-number-of-files

#334 FIX Offload computationally expensive work for large repos into …
  • Loading branch information
bender2352 authored Mar 5, 2025
2 parents ce0b62f + f7400e7 commit 237c74a
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 64 deletions.
80 changes: 41 additions & 39 deletions code/src/abb_scheduler_kqueue/abb_scheduler_kqueue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -224,19 +224,19 @@ module El = struct
try Kqueue.kevent t.kq ~changelist ~eventlist:t.eventlist ~timeout
with Unix.Unix_error (Unix.EINTR, _, _) -> 0
in
assert (ret >= 0);
let t = { t with curr_time = Unix.gettimeofday (); mono_time = Mtime_clock.elapsed () } in
let s = Abb_fut.State.set_state t s in
let s =
Kqueue.Eventlist.fold
~f:(fun s event ->
let t = Abb_fut.State.state s in
match Kqueue.Event.of_kevent event with
| Kqueue.Event.Read r
when Fd_map.mem
(Kqueue.unsafe_file_descr_of_int r.Kqueue.Event.Read.descr)
t.change_read ->
(* If the FD being processed now is in the change_read or
if ret >= 0 then (
let t = { t with curr_time = Unix.gettimeofday (); mono_time = Mtime_clock.elapsed () } in
let s = Abb_fut.State.set_state t s in
let s =
Kqueue.Eventlist.fold
~f:(fun s event ->
let t = Abb_fut.State.state s in
match Kqueue.Event.of_kevent event with
| Kqueue.Event.Read r
when Fd_map.mem
(Kqueue.unsafe_file_descr_of_int r.Kqueue.Event.Read.descr)
t.change_read ->
(* If the FD being processed now is in the change_read or
change_write map, then means we have done something in this
iteration of the loop where a previously handled FD has
modified something related to this FD. This is most likely an
Expand All @@ -248,32 +248,34 @@ module El = struct
already out of the kqueue. So we just remove the change
operation and continue on. That away we don't try to delete
what is already not there. *)
let fd = Kqueue.unsafe_file_descr_of_int r.Kqueue.Event.Read.descr in
assert (Fd_map.find fd t.change_read = `Del);
let t = { t with change_read = Fd_map.remove fd t.change_read } in
Abb_fut.State.set_state t s
| Kqueue.Event.Write w
when Fd_map.mem
(Kqueue.unsafe_file_descr_of_int w.Kqueue.Event.Write.descr)
t.change_write ->
(* See the comment above in Read. *)
let fd = Kqueue.unsafe_file_descr_of_int w.Kqueue.Event.Write.descr in
assert (Fd_map.find fd t.change_write = `Del);
let t = { t with change_write = Fd_map.remove fd t.change_write } in
Abb_fut.State.set_state t s
| Kqueue.Event.Read r ->
dispatch_read (Kqueue.unsafe_file_descr_of_int r.Kqueue.Event.Read.descr) s
| Kqueue.Event.Write w ->
dispatch_write (Kqueue.unsafe_file_descr_of_int w.Kqueue.Event.Write.descr) s
| _ -> s)
~init:s
t.eventlist
in
let s = dispatch_timers s in
let end_time = Mtime_clock.elapsed () in
let duration = Mtime.Span.(to_float_ns (abs_diff end_time t.mono_time) /. sec_ns) in
(Abb_fut.State.state s).exec_duration duration;
s
let fd = Kqueue.unsafe_file_descr_of_int r.Kqueue.Event.Read.descr in
assert (Fd_map.find fd t.change_read = `Del);
let t = { t with change_read = Fd_map.remove fd t.change_read } in
Abb_fut.State.set_state t s
| Kqueue.Event.Write w
when Fd_map.mem
(Kqueue.unsafe_file_descr_of_int w.Kqueue.Event.Write.descr)
t.change_write ->
(* See the comment above in Read. *)
let fd = Kqueue.unsafe_file_descr_of_int w.Kqueue.Event.Write.descr in
assert (Fd_map.find fd t.change_write = `Del);
let t = { t with change_write = Fd_map.remove fd t.change_write } in
Abb_fut.State.set_state t s
| Kqueue.Event.Read r ->
dispatch_read (Kqueue.unsafe_file_descr_of_int r.Kqueue.Event.Read.descr) s
| Kqueue.Event.Write w ->
dispatch_write (Kqueue.unsafe_file_descr_of_int w.Kqueue.Event.Write.descr) s
| _ -> s)
~init:s
t.eventlist
in
let s = dispatch_timers s in
let end_time = Mtime_clock.elapsed () in
let duration = Mtime.Span.(to_float_ns (abs_diff end_time t.mono_time) /. sec_ns) in
(Abb_fut.State.state s).exec_duration duration;
s)
else if Sys.getenv_opt "ABB_SCHEDULER_DEBUG" = Some "true" then s
else assert false
let rec loop s done_fut =
match Future.state done_fut with
Expand Down
52 changes: 27 additions & 25 deletions code/src/terrat_evaluator3/terrat_evaluator3.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1884,25 +1884,25 @@ module Make (S : Terrat_vcs_provider.S) = struct
~file_list:repo_tree
repo_config
in
Abb.Future.return
(compute_matches
~ctx:
(Terrat_base_repo_config_v1.Ctx.make
~dest_branch:(S.Ref.to_string base_branch_name)
~branch:(S.Ref.to_string branch_name)
())
~repo_config
~tag_query
~out_of_change_applies
~applied_dirspaces
~diff
~repo_tree
~index:
(CCOption.map_or
~default:Terrat_base_repo_config_v1.Index.empty
(fun { Terrat_vcs_provider.Index.index; _ } -> index)
index)
())
Abb.Thread.run (fun () ->
compute_matches
~ctx:
(Terrat_base_repo_config_v1.Ctx.make
~dest_branch:(S.Ref.to_string base_branch_name)
~branch:(S.Ref.to_string branch_name)
())
~repo_config
~tag_query
~out_of_change_applies
~applied_dirspaces
~diff
~repo_tree
~index:
(CCOption.map_or
~default:Terrat_base_repo_config_v1.Index.empty
(fun { Terrat_vcs_provider.Index.index; _ } -> index)
index)
())
>>= fun (working_set_matches, all_matches, all_tag_query_matches, all_unapplied_matches) ->
pull_request_safe ctx state
>>= function
Expand Down Expand Up @@ -2095,12 +2095,14 @@ module Make (S : Terrat_vcs_provider.S) = struct
~index:Terrat_base_repo_config_v1.Index.empty
repo_config)
>>= fun config ->
let matches =
CCList.flatten
(Terrat_change_match3.match_diff_list
config
(CCList.map (fun filename -> Terrat_change.Diff.(Change { filename })) repo_tree))
in
let open Abb.Future.Infix_monad in
Abb.Thread.run (fun () ->
CCList.flatten
(Terrat_change_match3.match_diff_list
config
(CCList.map (fun filename -> Terrat_change.Diff.(Change { filename })) repo_tree)))
>>= fun matches ->
let open Abbs_future_combinators.Infix_result_monad in
let workflows = Terrat_base_repo_config_v1.workflows repo_config in
let dirspaceflows =
CCList.map
Expand Down

0 comments on commit 237c74a

Please sign in to comment.