Skip to content

Commit

Permalink
Change to use Picos for scheduler interop
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Oct 25, 2023
1 parent 4c16f6b commit f33bd10
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 222 deletions.
2 changes: 1 addition & 1 deletion .ocamlformat
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
profile = default
version = 0.26.0
version = 0.26.1

exp-grouping=preserve
27 changes: 0 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -613,33 +613,6 @@ argument for potentially blocking operations. For example, to perform a blocking
pop with a timeout, one can simply explicitly pass the desired timeout in
seconds:

```ocaml
# let an_empty_stack = stack () in
Xt.commit ~timeoutf:0.1 { tx = pop an_empty_stack }
Exception: Failure "Domain_local_timeout.set_timeoutf not implemented".
```

Oops! What happened above is that the
[_domain local timeout_](https://github.com/ocaml-multicore/domain-local-timeout)
mechanism used by **Kcas** was not implemented on the current domain. The idea
is that, in the future, concurrent schedulers provide the mechanism out of the
box, but there is also a default implementation using the Stdlib `Thread` and
`Unix` modules that works on most platforms. However, to avoid direct
dependencies to `Thread` and `Unix`, we need to explicitly tell the library that
it can use those modules:

```ocaml
# Domain_local_timeout.set_system (module Thread) (module Unix)
- : unit = ()
```

This initialization, if needed, should be done by application code rather than
by libraries.

If we now retry the previous example we will get a
[`Timeout`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Timeout/index.html#exception-Timeout)
exception as expected:

```ocaml
# let an_empty_stack = stack () in
Xt.commit ~timeoutf:0.1 { tx = pop an_empty_stack }
Expand Down
34 changes: 4 additions & 30 deletions bench/bench.ml
Original file line number Diff line number Diff line change
Expand Up @@ -30,37 +30,11 @@ module Times = struct
results.(domain_i)
done
in
let prepare_for_await () =
let open struct
type state = Init | Released | Awaiting of { mutable released : bool }
end in
let state = Atomic.make Init in
let release () =
if Multicore_magic.fenceless_get state != Released then
match Atomic.exchange state Released with
| Awaiting r -> r.released <- true
| _ -> ()
in
let await () =
if Multicore_magic.fenceless_get state != Released then
let awaiting = Awaiting { released = false } in
if Atomic.compare_and_set state Init awaiting then
match awaiting with
| Awaiting r ->
(* Avoid sleeping *)
while not r.released do
Domain.cpu_relax ()
done
| _ -> ()
in
Domain_local_await.{ release; await }
let domains =
Array.init n_domains @@ fun domain_i ->
Domain.spawn @@ fun () -> main domain_i
in
Domain_local_await.using ~prepare_for_await ~while_running:(fun () ->
let domains =
Array.init n_domains @@ fun domain_i ->
Domain.spawn @@ fun () -> main domain_i
in
Array.iter Domain.join domains);
Array.iter Domain.join domains;
let n = Stack.length results.(0) in
let times = Array.create_float n in
for run_i = 0 to n - 1 do
Expand Down
1 change: 0 additions & 1 deletion bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
(package kcas_data)
(libraries
kcas_data
domain-local-await
multicore-magic
yojson
domain_shims
Expand Down
36 changes: 11 additions & 25 deletions doc/scheduler-interop.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ implementations that are conveniently provided by
```ocaml
# #thread
# #require "kcas_data"
# #require "picos"
# open Picos
# open Kcas_data
# open Kcas
```
Expand All @@ -36,45 +38,29 @@ module Scheduler : sig
val fiber : t -> (unit -> 'a) -> 'a Promise.t
end = struct
open Effect.Deep
type _ Effect.t +=
| Suspend : (('a, unit) continuation -> unit) -> 'a Effect.t
type t = {
queue: (unit -> unit) Queue.t;
domain: unit Domain.t
}
let spawn () =
let queue = Queue.create () in
let queue: (unit -> unit) Queue.t = Queue.create () in
let rec scheduler work =
let effc (type a) : a Effect.t -> _ = function
| Suspend ef -> Some ef
| _ -> None in
| Trigger.Await release ->
Some (fun (k: (a, _) continuation) ->
if not (Trigger.on_signal release () () @@ fun _ () () ->
Queue.add (fun () -> continue k None) queue) then
continue k None)
| _ ->
None in
try_with work () { effc };
match Queue.take_opt queue with
| Some work -> scheduler work
| None -> () in
let prepare_for_await _ =
let state = Atomic.make `Init in
let release () =
if Atomic.get state != `Released then
match Atomic.exchange state `Released with
| `Awaiting k ->
Queue.add (continue k) queue
| _ -> () in
let await () =
if Atomic.get state != `Released then
Effect.perform @@ Suspend (fun k ->
if not (Atomic.compare_and_set state `Init
(`Awaiting k)) then
continue k ())
in
Domain_local_await.{ release; await } in
let domain = Domain.spawn @@ fun () ->
try
while true do
let work = Queue.take_blocking queue in
Domain_local_await.using
~prepare_for_await
~while_running:(fun () -> scheduler work)
scheduler (Queue.take_blocking queue)
done
with Exit -> () in
{ queue; domain }
Expand Down
5 changes: 2 additions & 3 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
(depends
(ocaml (>= 4.13.0))
(backoff (>= 0.1.0))
(domain-local-await (>= 1.0.0))
(domain-local-timeout (>= 1.0.0))
picos
(multicore-magic (>= 2.0.0))
(domain_shims (and (>= 0.1.0) :with-test))
(alcotest (and (>= 1.7.0) :with-test))
Expand All @@ -26,7 +25,7 @@
(depends
(kcas (= :version))
(multicore-magic (>= 2.0.0))
(domain-local-await (and (>= 1.0.0) :with-test))
(picos :with-test)
(domain_shims (and (>= 0.1.0) :with-test))
(mtime (and (>= 2.0.0) :with-test))
(alcotest (and (>= 1.7.0) :with-test))
Expand Down
6 changes: 4 additions & 2 deletions kcas.opam
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ depends: [
"dune" {>= "3.8"}
"ocaml" {>= "4.13.0"}
"backoff" {>= "0.1.0"}
"domain-local-await" {>= "1.0.0"}
"domain-local-timeout" {>= "1.0.0"}
"picos"
"multicore-magic" {>= "2.0.0"}
"domain_shims" {>= "0.1.0" & with-test}
"alcotest" {>= "1.7.0" & with-test}
Expand All @@ -40,3 +39,6 @@ build: [
]
dev-repo: "git+https://github.com/ocaml-multicore/kcas.git"
doc: "https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/"
pin-depends: [
[ "picos.dev" "git+https://github.com/ocaml-multicore/picos#eb36ad73665f120fb2bcfd441cc364146aaf489a" ]
]
3 changes: 3 additions & 0 deletions kcas.opam.template
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
doc: "https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/"
pin-depends: [
[ "picos.dev" "git+https://github.com/ocaml-multicore/picos#eb36ad73665f120fb2bcfd441cc364146aaf489a" ]
]
2 changes: 1 addition & 1 deletion kcas_data.opam
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ depends: [
"dune" {>= "3.8"}
"kcas" {= version}
"multicore-magic" {>= "2.0.0"}
"domain-local-await" {>= "1.0.0" & with-test}
"picos" {with-test}
"domain_shims" {>= "0.1.0" & with-test}
"mtime" {>= "2.0.0" & with-test}
"alcotest" {>= "1.7.0" & with-test}
Expand Down
2 changes: 1 addition & 1 deletion src/kcas/dune
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(library
(name kcas)
(public_name kcas)
(libraries domain-local-await domain-local-timeout backoff multicore-magic))
(libraries picos backoff multicore-magic))

(mdx
(package kcas)
Expand Down
Loading

0 comments on commit f33bd10

Please sign in to comment.