From cf2beeb1ae5361806010a68b19320ab4dadd5600 Mon Sep 17 00:00:00 2001 From: Vesa Karvonen Date: Mon, 11 Dec 2023 13:36:01 +0200 Subject: [PATCH] MPMC 2-stack queue --- dune-project | 1 + saturn_lockfree.opam | 1 + src/saturn.ml | 1 + src/saturn.mli | 1 + src_lockfree/dune | 2 +- src_lockfree/saturn_lockfree.ml | 1 + src_lockfree/saturn_lockfree.mli | 1 + src_lockfree/two_stack_queue.ml | 145 ++++++++++++++++++++ src_lockfree/two_stack_queue.mli | 20 +++ test/two_stack_queue/dune | 13 ++ test/two_stack_queue/stm_two_stack_queue.ml | 83 +++++++++++ 11 files changed, 268 insertions(+), 1 deletion(-) create mode 100644 src_lockfree/two_stack_queue.ml create mode 100644 src_lockfree/two_stack_queue.mli create mode 100644 test/two_stack_queue/dune create mode 100644 test/two_stack_queue/stm_two_stack_queue.ml diff --git a/dune-project b/dune-project index a5c6dba1..0bd92bb4 100644 --- a/dune-project +++ b/dune-project @@ -27,6 +27,7 @@ (ocaml (>= 4.13)) (domain_shims (and (>= 0.1.0) :with-test)) (backoff (>= 0.1.0)) + (multicore-magic (>= 2.0.0)) (alcotest (and (>= 1.7.0) :with-test)) (qcheck (and (>= 0.21.3) :with-test)) (qcheck-core (and (>= 0.21.3) :with-test)) diff --git a/saturn_lockfree.opam b/saturn_lockfree.opam index 8b0c8ed3..353f803d 100644 --- a/saturn_lockfree.opam +++ b/saturn_lockfree.opam @@ -12,6 +12,7 @@ depends: [ "ocaml" {>= "4.13"} "domain_shims" {>= "0.1.0" & with-test} "backoff" {>= "0.1.0"} + "multicore-magic" {>= "2.0.0"} "alcotest" {>= "1.7.0" & with-test} "qcheck" {>= "0.21.3" & with-test} "qcheck-core" {>= "0.21.3" & with-test} diff --git a/src/saturn.ml b/src/saturn.ml index 10c6e7c7..da0918ea 100644 --- a/src/saturn.ml +++ b/src/saturn.ml @@ -35,3 +35,4 @@ module Single_prod_single_cons_queue = module Single_consumer_queue = Saturn_lockfree.Single_consumer_queue module Relaxed_queue = Mpmc_relaxed_queue +module Two_stack_queue = Saturn_lockfree.Two_stack_queue diff --git a/src/saturn.mli b/src/saturn.mli index d1b5eaf0..e74855e4 100644 --- a/src/saturn.mli +++ b/src/saturn.mli @@ -39,3 +39,4 @@ module Single_prod_single_cons_queue = module Single_consumer_queue = Saturn_lockfree.Single_consumer_queue module Relaxed_queue = Mpmc_relaxed_queue +module Two_stack_queue = Saturn_lockfree.Two_stack_queue diff --git a/src_lockfree/dune b/src_lockfree/dune index a1d7cc0e..dbfaff3b 100644 --- a/src_lockfree/dune +++ b/src_lockfree/dune @@ -10,7 +10,7 @@ let () = (library (name saturn_lockfree) (public_name saturn_lockfree) - (libraries backoff |} + (libraries backoff multicore-magic |} ^ maybe_threads ^ {| )) diff --git a/src_lockfree/saturn_lockfree.ml b/src_lockfree/saturn_lockfree.ml index d6cd1d35..dd528496 100644 --- a/src_lockfree/saturn_lockfree.ml +++ b/src_lockfree/saturn_lockfree.ml @@ -32,3 +32,4 @@ module Work_stealing_deque = Ws_deque module Single_prod_single_cons_queue = Spsc_queue module Single_consumer_queue = Mpsc_queue module Relaxed_queue = Mpmc_relaxed_queue +module Two_stack_queue = Two_stack_queue diff --git a/src_lockfree/saturn_lockfree.mli b/src_lockfree/saturn_lockfree.mli index 4adea516..0a3f6891 100644 --- a/src_lockfree/saturn_lockfree.mli +++ b/src_lockfree/saturn_lockfree.mli @@ -36,3 +36,4 @@ module Work_stealing_deque = Ws_deque module Single_prod_single_cons_queue = Spsc_queue module Single_consumer_queue = Mpsc_queue module Relaxed_queue = Mpmc_relaxed_queue +module Two_stack_queue = Two_stack_queue diff --git a/src_lockfree/two_stack_queue.ml b/src_lockfree/two_stack_queue.ml new file mode 100644 index 00000000..67cf0d73 --- /dev/null +++ b/src_lockfree/two_stack_queue.ml @@ -0,0 +1,145 @@ +let atomic_get x = Atomic.get (Sys.opaque_identity x) + +type 'a t = { head : 'a head_pack Atomic.t; tail : 'a tail_pack Atomic.t } + +and ('a, _) head = + | Cons : { + counter : int; + value : 'a; + suffix : 'a head_pack; + } + -> ('a, [> `Cons ]) head + | Head : { counter : int } -> ('a, [> `Head ]) head + +and 'a head_pack = H : ('a, [< `Cons | `Head ]) head -> 'a head_pack +[@@unboxed] + +and ('a, _) tail = + | Snoc : { + counter : int; + prefix : 'a tail_pack; + value : 'a; + } + -> ('a, [> `Snoc ]) tail + | Tail : { + counter : int; + mutable move : ('a, [ `Snoc ]) tail; + } + -> ('a, [> `Tail ]) tail + +and 'a tail_pack = T : ('a, [< `Snoc | `Tail ]) tail -> 'a tail_pack +[@@unboxed] + +let create () = + let head = + Atomic.make (H (Head { counter = 1 })) |> Multicore_magic.copy_as_padded + in + let tail = + Atomic.make (T (Tail { counter = 0; move = Obj.magic () })) + |> Multicore_magic.copy_as_padded + in + { head; tail } |> Multicore_magic.copy_as_padded + +let rec rev (suffix : (_, [< `Cons ]) head) = function + | T (Snoc { counter; prefix; value }) -> + rev (Cons { counter; value; suffix = H suffix }) prefix + | T (Tail _) -> suffix + +let rev = function + | (Snoc { counter; prefix; value } : (_, [< `Snoc ]) tail) -> + rev + (Cons { counter; value; suffix = H (Head { counter = counter + 1 }) }) + prefix + +let counter_of_head = function (Head r : (_, [< `Head ]) head) -> r.counter +let counter_of_snoc = function (Snoc r : (_, [< `Snoc ]) tail) -> r.counter +let counter_of_tail = function (Tail r : (_, [< `Tail ]) tail) -> r.counter + +let clear_move = function + | (Tail tail_r : (_, [< `Tail ]) tail) -> tail_r.move <- Obj.magic () + +let rec push backoff t value = + match atomic_get t.tail with + | T (Snoc snoc_r as snoc) -> push_with backoff t snoc_r.counter (T snoc) value + | T (Tail tail_r as tail) -> + let move = tail_r.move in + if move != Obj.magic () then + match atomic_get t.head with + | H (Head _ as head) when counter_of_head head < counter_of_snoc move -> + let after = rev move in + if Atomic.compare_and_set t.head (H head) (H after) then + clear_move tail; + push backoff t value + | _ -> push_with backoff t (counter_of_tail tail) (T tail) value + else push_with backoff t (counter_of_tail tail) (T tail) value + +and push_with backoff t counter prefix value = + let after = Snoc { counter = counter + 1; prefix; value } in + if not (Atomic.compare_and_set t.tail prefix (T after)) then + push (Backoff.once backoff) t value + +let push t value = push Backoff.default t value +let is_tail = function T (Tail _) -> true | T (Snoc _) -> false + +exception Empty + +let rec pop backoff t = + match atomic_get t.head with + | H (Cons cons_r as cons) -> + if Atomic.compare_and_set t.head (H cons) cons_r.suffix then cons_r.value + else pop (Backoff.once backoff) t + | H (Head _ as head) -> begin + match atomic_get t.tail with + | T (Snoc snoc_r as move) -> + if is_tail snoc_r.prefix then begin + let tail = + Tail { counter = snoc_r.counter - 1; move = Obj.magic () } + in + if + atomic_get t.head == H head + && Atomic.compare_and_set t.tail (T move) (T tail) + then snoc_r.value + else pop backoff t + end + else + let tail = Tail { counter = snoc_r.counter; move } in + if + atomic_get t.head == H head + && Atomic.compare_and_set t.tail (T move) (T tail) + then pop_moving backoff t head move tail + else pop backoff t + | T (Tail tail_r as tail) -> + let move = tail_r.move in + if move == Obj.magic () then pop_emptyish backoff t head + else pop_moving backoff t head move tail + end + +and pop_moving backoff t head move tail = + if counter_of_head head < counter_of_snoc move then + match rev move with + | Cons cons_r -> + if Atomic.compare_and_set t.head (H head) cons_r.suffix then begin + clear_move tail; + cons_r.value + end + else pop backoff t + else pop_emptyish backoff t head + +and pop_emptyish backoff t head = + if atomic_get t.head == H head then raise_notrace Empty else pop backoff t + +let pop t = pop Backoff.default t +let pop_opt t = match pop t with value -> Some value | exception Empty -> None + +let rec length t = + let head = atomic_get t.head in + let tail = atomic_get t.tail in + if head != atomic_get t.head then length t + else + let head_at = + match head with H (Cons r) -> r.counter | H (Head r) -> r.counter + in + let tail_at = + match tail with T (Snoc r) -> r.counter | T (Tail r) -> r.counter + in + tail_at - head_at + 1 diff --git a/src_lockfree/two_stack_queue.mli b/src_lockfree/two_stack_queue.mli new file mode 100644 index 00000000..910ffbaf --- /dev/null +++ b/src_lockfree/two_stack_queue.mli @@ -0,0 +1,20 @@ +type !'a t +(** *) + +val create : unit -> 'a t +(** *) + +val push : 'a t -> 'a -> unit +(** *) + +exception Empty +(** *) + +val pop : 'a t -> 'a +(** *) + +val pop_opt : 'a t -> 'a option +(** *) + +val length : 'a t -> int +(** *) diff --git a/test/two_stack_queue/dune b/test/two_stack_queue/dune new file mode 100644 index 00000000..7ac3bc53 --- /dev/null +++ b/test/two_stack_queue/dune @@ -0,0 +1,13 @@ +(test + (package saturn_lockfree) + (name stm_two_stack_queue) + (modules stm_two_stack_queue) + (libraries + saturn_lockfree + qcheck-core + qcheck-core.runner + qcheck-stm.stm + qcheck-stm.sequential + qcheck-stm.domain) + (action + (run %{test} --verbose))) diff --git a/test/two_stack_queue/stm_two_stack_queue.ml b/test/two_stack_queue/stm_two_stack_queue.ml new file mode 100644 index 00000000..d4a7d842 --- /dev/null +++ b/test/two_stack_queue/stm_two_stack_queue.ml @@ -0,0 +1,83 @@ +module Two_stack_queue = Saturn_lockfree.Two_stack_queue + +let () = + let q = Two_stack_queue.create () in + Two_stack_queue.push q 101; + Two_stack_queue.push q 42; + assert (Two_stack_queue.pop_opt q = Some 101); + Two_stack_queue.push q 76; + assert (Two_stack_queue.pop_opt q = Some 42); + assert (Two_stack_queue.pop_opt q = Some 76); + assert (Two_stack_queue.pop_opt q = None) + +module Two_stack_queue_spec = struct + type cmd = Push of int | Pop_opt | Length + + let show_cmd = function + | Push x -> "Push " ^ string_of_int x + | Pop_opt -> "Pop_opt" + | Length -> "Length" + + module State = struct + type t = int list * int list + + let push x (h, t) = if h == [] then ([ x ], []) else (h, x :: t) + let peek_opt (h, _) = match h with x :: _ -> Some x | [] -> None + let length (h, t) = List.length h + List.length t + + let drop ((h, t) as s) = + match h with [] -> s | [ _ ] -> (List.rev t, []) | _ :: h -> (h, t) + end + + type state = State.t + type sut = int Two_stack_queue.t + + let arb_cmd _s = + let open QCheck in + [ + Gen.int_range 1 10000 |> Gen.map (fun x -> Push x); + Gen.return Pop_opt; + Gen.return Length; + ] + |> Gen.oneof |> make ~print:show_cmd + + let init_state = ([], []) + let init_sut () = Two_stack_queue.create () + let cleanup _ = () + + let next_state c s = + match c with + | Push x -> State.push x s + | Pop_opt -> State.drop s + | Length -> s + + let precond _ _ = true + + let run c d = + let open STM in + match c with + | Push x -> Res (unit, Two_stack_queue.push d x) + | Pop_opt -> Res (option int, Two_stack_queue.pop_opt d) + | Length -> Res (int, Two_stack_queue.length d) + + let postcond c (s : state) res = + let open STM in + match (c, res) with + | Push _x, Res ((Unit, _), ()) -> true + | Pop_opt, Res ((Option Int, _), res) -> res = State.peek_opt s + | Length, Res ((Int, _), res) -> res = State.length s + | _, _ -> false +end + +module Two_stack_queue_seq = STM_sequential.Make (Two_stack_queue_spec) +module Two_stack_queue_par = STM_domain.Make (Two_stack_queue_spec) + +let () = + let count = 1000 in + QCheck_base_runner.run_tests_main + [ + Two_stack_queue_seq.agree_test ~count + ~name:"STM Two_stack_queue test sequential"; + Two_stack_queue_par.agree_test_par ~count + ~name:"STM Two_stack_queue test parallel"; + ]