Skip to content

Commit

Permalink
Merge pull request #13 from terrateamio/pro-589-fix-store-repository-…
Browse files Browse the repository at this point in the history
…performance

Pro 589 fix store repository performance
  • Loading branch information
orbitz authored Oct 12, 2024
2 parents 6ad8658 + 747978e commit ba78208
Show file tree
Hide file tree
Showing 6 changed files with 643 additions and 82 deletions.
12 changes: 12 additions & 0 deletions code/pds.conf
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,15 @@ deps = [ "abb_channel", "abb_channel_queue", "abb_future_combinators", "abb_happ
[src.abb_thread_pool]
install = true

[src.abb_keyed_concurrent_executor]
install = true
deps = [
"abb_future_combinators",
"abb_intf",
"abb_service_local",
"containers",
]

[src.json_schema_conv]
install = false
deps = [ "compiler-libs.common", "containers", "ocaml-compiler-libs.common", "ocaml-compiler-libs.shadow", "ppx_deriving", "ppx_deriving.eq", "ppx_deriving.ord", "ppx_deriving.show", "ppx_deriving_yojson", "yojson",]
Expand Down Expand Up @@ -662,6 +671,9 @@ deps = [ "oth", "otls",]
[tests.abb_future_combinators]
deps = [ "oth", "abb_intf", "abb_fut", "abb_future_combinators",]

[tests.abb_keyed_concurrent_executor]
deps = [ "oth", "abb_intf", "abb_fut", "containers", "abb_keyed_concurrent_executor" ]

[tests.oth]
deps = [ "oth",]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
type enqueue_err = [ `Closed ] [@@deriving show]

module Queue : sig
type 'a t

val empty : 'a t
val of_list : 'a list -> 'a t
val to_list : 'a t -> 'a list
val enqueue : 'a -> 'a t -> 'a t
end = struct
type 'a t = {
front : 'a list;
rear : 'a list;
}

let empty = { front = []; rear = [] }
let of_list front = { front; rear = [] }
let to_list { front; rear } = front @ CCList.rev rear
let enqueue v t = { t with rear = v :: t.rear }
end

module Make (Fut : Abb_intf.Future.S) (Key : Map.OrderedType) = struct
module Fut_comb = Abb_future_combinators.Make (Fut)
module Channel = Abb_channel.Make (Fut)
module Service = Abb_service_local.Make (Fut)
module Key_set = CCSet.Make (Key)

module Task = struct
type 'a t = {
keys : Key.t list;
task : 'a;
}
end

module Msg = struct
type 'a t =
| Enqueue of 'a Task.t
| Drain of unit Fut.Promise.t
| Completed of { keys : Key.t list }
end

module Server = struct
type 'a t = {
slots : int;
used_slots : int;
f : 'a -> unit Fut.t;
locked_keys : Key_set.t;
queue : 'a Task.t Queue.t;
}

let make slots f =
{ slots; used_slots = 0; f; locked_keys = Key_set.empty; queue = Queue.empty }

let exec task f w =
Fut_comb.ignore
(Fut.fork
(Fut_comb.with_finally
(fun () -> Fut_comb.ignore (f task.Task.task))
~finally:(fun () ->
Fut_comb.ignore (Channel.send w (Msg.Completed { keys = task.Task.keys })))))

let task_can_run would_lock_keys locked_keys keys =
(* Task can run if all none of its keys exist in the locked keys. *)
(not (CCList.exists (CCFun.flip Key_set.mem locked_keys) keys))
&& not (CCList.exists (CCFun.flip Key_set.mem would_lock_keys) keys)

let rec exec_available_work' w t would_lock_keys acc = function
| [] -> Fut.return { t with queue = Queue.of_list (CCList.rev acc) }
| tasks when t.slots <= t.used_slots ->
Fut.return { t with queue = Queue.of_list (CCList.rev acc @ tasks) }
| task :: tasks when task_can_run would_lock_keys t.locked_keys task.Task.keys ->
let open Fut.Infix_monad in
let locked_keys = Key_set.add_list t.locked_keys task.Task.keys in
exec task t.f w
>>= fun () ->
exec_available_work'
w
{ t with used_slots = t.used_slots + 1; locked_keys }
would_lock_keys
acc
tasks
| task :: tasks ->
exec_available_work'
w
t
(Key_set.add_list would_lock_keys task.Task.keys)
(task :: acc)
tasks

let exec_available_work w t = exec_available_work' w t Key_set.empty [] (Queue.to_list t.queue)

let rec loop_drain drains t w r =
let open Fut.Infix_monad in
Channel.recv r
>>= function
| `Ok (Msg.Drain drain) -> loop_drain (drain :: drains) t w r
| `Ok (Msg.Completed { keys }) -> (
let locked_keys = CCList.fold_left (CCFun.flip Key_set.remove) t.locked_keys keys in
let t = { t with used_slots = t.used_slots - 1; locked_keys } in
exec_available_work w t
>>= function
| { used_slots = 0; _ } -> Fut_comb.List.iter ~f:(CCFun.flip Fut.Promise.set ()) drains
| t -> loop_drain drains t w r)
| `Ok (Msg.Enqueue _) -> assert false
| `Closed -> assert false

let rec loop t w r =
let open Fut.Infix_monad in
Channel.recv r
>>= function
| `Ok (Msg.Enqueue task) ->
let open Fut.Infix_monad in
let t = { t with queue = Queue.enqueue task t.queue } in
exec_available_work w t >>= fun t -> loop t w r
| `Ok (Msg.Drain drain) when t.used_slots = 0 ->
(* No work in flight? Mark as drained. *)
Fut.Promise.set drain ()
| `Ok (Msg.Drain drain) -> loop_drain [ drain ] t w r
| `Ok (Msg.Completed { keys }) ->
let locked_keys = CCList.fold_left (CCFun.flip Key_set.remove) t.locked_keys keys in
let t = { t with used_slots = t.used_slots - 1; locked_keys } in
exec_available_work w t >>= fun t -> loop t w r
| `Closed -> Fut.return ()
end

type 'a t = {
mutable draining : bool;
w : 'a Msg.t Service.w;
}

let create ~slots f =
let open Fut.Infix_monad in
Service.create (Server.loop (Server.make slots f))
>>= fun w -> Fut.return { draining = false; w }

let enqueue { draining; w } ~keys task =
if draining then Fut.return (Error `Closed)
else
let open Fut.Infix_monad in
Channel.send w (Msg.Enqueue Task.{ keys; task })
>>= function
| `Ok () -> Fut.return (Ok ())
| `Closed -> Fut.return (Error `Closed)

let destroy { w; _ } = Channel.close w

let drain_and_destroy ({ w; _ } as t) =
let open Fut.Infix_monad in
let d = Fut.Promise.create () in
Channel.send w (Msg.Drain d)
>>= function
| `Ok () ->
t.draining <- true;
Fut.Promise.future d
| `Closed -> Fut.return ()
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
(** A keyed concurrent executor allows the concurrent execution of work that is
serialized based on its derived keys. This allows a piece of work to
serialize across multiple dimensions. For example, consider a system with
users. When renaming a user, the operation to perform the rename could have
the new username and the old username as keys, guaranteeing no other rename
operations interfer.
Order is guaranteed across keys. For example, consider a queue with 3 items
in it with the following keys: 1) [A], 2) [A, B], 3) [B, C]. Once (1) is
executing, (2) cannot be, however (3) could be, based purely on the
overlapping keys. But (2) would lock a key that (3) depends on. Therefore,
(3) should only run after (2). *)

type enqueue_err = [ `Closed ] [@@deriving show]

module Make (Fut : Abb_intf.Future.S) (Key : Map.OrderedType) : sig
type 'a t

(** Create an executor with the specified number of slots and a function that
knows how to create the list of keys from the work item. *)
val create : slots:int -> ('a -> unit Fut.t) -> 'a t Fut.t

(** Enqueue a piece of work, return immediately. If the queue is draining,
[enqueue] will return [`Closed]. An empty key list does not block on any
other key, including other empty key lists. *)
val enqueue : 'a t -> keys:Key.t list -> 'a -> (unit, [> enqueue_err ]) result Fut.t

(** Destroy the executor immediately. Do not wait for any executing or queued
work to complete. If the executor has already been destroyed, this is a
noop. *)
val destroy : 'a t -> unit Fut.t

(** Prevent the queue from accepting any more work, wait for all work to
complete (queued and in-flight), and destroy the queue. Return when queue
has been destroyed. If the executor has already been destroyed, this is a
noop. *)
val drain_and_destroy : 'a t -> unit Fut.t
end
Loading

0 comments on commit ba78208

Please sign in to comment.