Skip to content

Commit

Permalink
MPMC 2-stack queue
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Dec 20, 2023
1 parent 0668f83 commit cf2beeb
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 1 deletion.
1 change: 1 addition & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
(ocaml (>= 4.13))
(domain_shims (and (>= 0.1.0) :with-test))
(backoff (>= 0.1.0))
(multicore-magic (>= 2.0.0))
(alcotest (and (>= 1.7.0) :with-test))
(qcheck (and (>= 0.21.3) :with-test))
(qcheck-core (and (>= 0.21.3) :with-test))
Expand Down
1 change: 1 addition & 0 deletions saturn_lockfree.opam
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ depends: [
"ocaml" {>= "4.13"}
"domain_shims" {>= "0.1.0" & with-test}
"backoff" {>= "0.1.0"}
"multicore-magic" {>= "2.0.0"}
"alcotest" {>= "1.7.0" & with-test}
"qcheck" {>= "0.21.3" & with-test}
"qcheck-core" {>= "0.21.3" & with-test}
Expand Down
1 change: 1 addition & 0 deletions src/saturn.ml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ module Single_prod_single_cons_queue =

module Single_consumer_queue = Saturn_lockfree.Single_consumer_queue
module Relaxed_queue = Mpmc_relaxed_queue
module Two_stack_queue = Saturn_lockfree.Two_stack_queue
1 change: 1 addition & 0 deletions src/saturn.mli
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ module Single_prod_single_cons_queue =

module Single_consumer_queue = Saturn_lockfree.Single_consumer_queue
module Relaxed_queue = Mpmc_relaxed_queue
module Two_stack_queue = Saturn_lockfree.Two_stack_queue
2 changes: 1 addition & 1 deletion src_lockfree/dune
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ let () =
(library
(name saturn_lockfree)
(public_name saturn_lockfree)
(libraries backoff |}
(libraries backoff multicore-magic |}
^ maybe_threads
^ {| ))

Expand Down
1 change: 1 addition & 0 deletions src_lockfree/saturn_lockfree.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ module Work_stealing_deque = Ws_deque
module Single_prod_single_cons_queue = Spsc_queue
module Single_consumer_queue = Mpsc_queue
module Relaxed_queue = Mpmc_relaxed_queue
module Two_stack_queue = Two_stack_queue
1 change: 1 addition & 0 deletions src_lockfree/saturn_lockfree.mli
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ module Work_stealing_deque = Ws_deque
module Single_prod_single_cons_queue = Spsc_queue
module Single_consumer_queue = Mpsc_queue
module Relaxed_queue = Mpmc_relaxed_queue
module Two_stack_queue = Two_stack_queue
145 changes: 145 additions & 0 deletions src_lockfree/two_stack_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
let atomic_get x = Atomic.get (Sys.opaque_identity x)

type 'a t = { head : 'a head_pack Atomic.t; tail : 'a tail_pack Atomic.t }

and ('a, _) head =
| Cons : {
counter : int;
value : 'a;
suffix : 'a head_pack;
}
-> ('a, [> `Cons ]) head
| Head : { counter : int } -> ('a, [> `Head ]) head

and 'a head_pack = H : ('a, [< `Cons | `Head ]) head -> 'a head_pack
[@@unboxed]

and ('a, _) tail =
| Snoc : {
counter : int;
prefix : 'a tail_pack;
value : 'a;
}
-> ('a, [> `Snoc ]) tail
| Tail : {
counter : int;
mutable move : ('a, [ `Snoc ]) tail;
}
-> ('a, [> `Tail ]) tail

and 'a tail_pack = T : ('a, [< `Snoc | `Tail ]) tail -> 'a tail_pack
[@@unboxed]

let create () =
let head =
Atomic.make (H (Head { counter = 1 })) |> Multicore_magic.copy_as_padded
in
let tail =
Atomic.make (T (Tail { counter = 0; move = Obj.magic () }))
|> Multicore_magic.copy_as_padded
in
{ head; tail } |> Multicore_magic.copy_as_padded

let rec rev (suffix : (_, [< `Cons ]) head) = function
| T (Snoc { counter; prefix; value }) ->
rev (Cons { counter; value; suffix = H suffix }) prefix
| T (Tail _) -> suffix

let rev = function
| (Snoc { counter; prefix; value } : (_, [< `Snoc ]) tail) ->
rev
(Cons { counter; value; suffix = H (Head { counter = counter + 1 }) })
prefix

let counter_of_head = function (Head r : (_, [< `Head ]) head) -> r.counter
let counter_of_snoc = function (Snoc r : (_, [< `Snoc ]) tail) -> r.counter
let counter_of_tail = function (Tail r : (_, [< `Tail ]) tail) -> r.counter

let clear_move = function
| (Tail tail_r : (_, [< `Tail ]) tail) -> tail_r.move <- Obj.magic ()

let rec push backoff t value =
match atomic_get t.tail with
| T (Snoc snoc_r as snoc) -> push_with backoff t snoc_r.counter (T snoc) value
| T (Tail tail_r as tail) ->
let move = tail_r.move in
if move != Obj.magic () then
match atomic_get t.head with
| H (Head _ as head) when counter_of_head head < counter_of_snoc move ->
let after = rev move in
if Atomic.compare_and_set t.head (H head) (H after) then
clear_move tail;
push backoff t value
| _ -> push_with backoff t (counter_of_tail tail) (T tail) value
else push_with backoff t (counter_of_tail tail) (T tail) value

and push_with backoff t counter prefix value =
let after = Snoc { counter = counter + 1; prefix; value } in
if not (Atomic.compare_and_set t.tail prefix (T after)) then
push (Backoff.once backoff) t value

let push t value = push Backoff.default t value
let is_tail = function T (Tail _) -> true | T (Snoc _) -> false

exception Empty

let rec pop backoff t =
match atomic_get t.head with
| H (Cons cons_r as cons) ->
if Atomic.compare_and_set t.head (H cons) cons_r.suffix then cons_r.value
else pop (Backoff.once backoff) t
| H (Head _ as head) -> begin
match atomic_get t.tail with
| T (Snoc snoc_r as move) ->
if is_tail snoc_r.prefix then begin
let tail =
Tail { counter = snoc_r.counter - 1; move = Obj.magic () }
in
if
atomic_get t.head == H head
&& Atomic.compare_and_set t.tail (T move) (T tail)
then snoc_r.value
else pop backoff t
end
else
let tail = Tail { counter = snoc_r.counter; move } in
if
atomic_get t.head == H head
&& Atomic.compare_and_set t.tail (T move) (T tail)
then pop_moving backoff t head move tail
else pop backoff t
| T (Tail tail_r as tail) ->
let move = tail_r.move in
if move == Obj.magic () then pop_emptyish backoff t head
else pop_moving backoff t head move tail
end

and pop_moving backoff t head move tail =
if counter_of_head head < counter_of_snoc move then
match rev move with
| Cons cons_r ->
if Atomic.compare_and_set t.head (H head) cons_r.suffix then begin
clear_move tail;
cons_r.value
end
else pop backoff t
else pop_emptyish backoff t head

and pop_emptyish backoff t head =
if atomic_get t.head == H head then raise_notrace Empty else pop backoff t

let pop t = pop Backoff.default t
let pop_opt t = match pop t with value -> Some value | exception Empty -> None

let rec length t =
let head = atomic_get t.head in
let tail = atomic_get t.tail in
if head != atomic_get t.head then length t
else
let head_at =
match head with H (Cons r) -> r.counter | H (Head r) -> r.counter
in
let tail_at =
match tail with T (Snoc r) -> r.counter | T (Tail r) -> r.counter
in
tail_at - head_at + 1
20 changes: 20 additions & 0 deletions src_lockfree/two_stack_queue.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
type !'a t
(** *)

val create : unit -> 'a t
(** *)

val push : 'a t -> 'a -> unit
(** *)

exception Empty
(** *)

val pop : 'a t -> 'a
(** *)

val pop_opt : 'a t -> 'a option
(** *)

val length : 'a t -> int
(** *)
13 changes: 13 additions & 0 deletions test/two_stack_queue/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
(test
(package saturn_lockfree)
(name stm_two_stack_queue)
(modules stm_two_stack_queue)
(libraries
saturn_lockfree
qcheck-core
qcheck-core.runner
qcheck-stm.stm
qcheck-stm.sequential
qcheck-stm.domain)
(action
(run %{test} --verbose)))
83 changes: 83 additions & 0 deletions test/two_stack_queue/stm_two_stack_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
module Two_stack_queue = Saturn_lockfree.Two_stack_queue

let () =
let q = Two_stack_queue.create () in
Two_stack_queue.push q 101;
Two_stack_queue.push q 42;
assert (Two_stack_queue.pop_opt q = Some 101);
Two_stack_queue.push q 76;
assert (Two_stack_queue.pop_opt q = Some 42);
assert (Two_stack_queue.pop_opt q = Some 76);
assert (Two_stack_queue.pop_opt q = None)

module Two_stack_queue_spec = struct
type cmd = Push of int | Pop_opt | Length

let show_cmd = function
| Push x -> "Push " ^ string_of_int x
| Pop_opt -> "Pop_opt"
| Length -> "Length"

module State = struct
type t = int list * int list

let push x (h, t) = if h == [] then ([ x ], []) else (h, x :: t)
let peek_opt (h, _) = match h with x :: _ -> Some x | [] -> None
let length (h, t) = List.length h + List.length t

let drop ((h, t) as s) =
match h with [] -> s | [ _ ] -> (List.rev t, []) | _ :: h -> (h, t)
end

type state = State.t
type sut = int Two_stack_queue.t

let arb_cmd _s =
let open QCheck in
[
Gen.int_range 1 10000 |> Gen.map (fun x -> Push x);
Gen.return Pop_opt;
Gen.return Length;
]
|> Gen.oneof |> make ~print:show_cmd

let init_state = ([], [])
let init_sut () = Two_stack_queue.create ()
let cleanup _ = ()

let next_state c s =
match c with
| Push x -> State.push x s
| Pop_opt -> State.drop s
| Length -> s

let precond _ _ = true

let run c d =
let open STM in
match c with
| Push x -> Res (unit, Two_stack_queue.push d x)
| Pop_opt -> Res (option int, Two_stack_queue.pop_opt d)
| Length -> Res (int, Two_stack_queue.length d)

let postcond c (s : state) res =
let open STM in
match (c, res) with
| Push _x, Res ((Unit, _), ()) -> true
| Pop_opt, Res ((Option Int, _), res) -> res = State.peek_opt s
| Length, Res ((Int, _), res) -> res = State.length s
| _, _ -> false
end

module Two_stack_queue_seq = STM_sequential.Make (Two_stack_queue_spec)
module Two_stack_queue_par = STM_domain.Make (Two_stack_queue_spec)

let () =
let count = 1000 in
QCheck_base_runner.run_tests_main
[
Two_stack_queue_seq.agree_test ~count
~name:"STM Two_stack_queue test sequential";
Two_stack_queue_par.agree_test_par ~count
~name:"STM Two_stack_queue test parallel";
]

0 comments on commit cf2beeb

Please sign in to comment.