diff --git a/bench/bench_two_stack_queue.ml b/bench/bench_two_stack_queue.ml new file mode 100644 index 00000000..25d0d89f --- /dev/null +++ b/bench/bench_two_stack_queue.ml @@ -0,0 +1,73 @@ +open Multicore_bench +module Queue = Saturn_lockfree.Two_stack_queue + +let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () = + let t = Queue.create () in + + let op push = if push then Queue.push t 101 else Queue.pop_opt t |> ignore in + + let init _ = + assert (Queue.length t = 0); + Util.generate_push_and_pop_sequence n_msgs + in + let work _ bits = Util.Bits.iter op bits in + + Times.record ~budgetf ~n_domains:1 ~init ~work () + |> Util.thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain" + +let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2) + ?(n_msgs = 50 * Util.iter_factor) () = + let n_domains = n_adders + n_takers in + + let t = Queue.create () in + + let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in + let n_msgs_to_add = Atomic.make 0 |> Multicore_magic.copy_as_padded in + + let init _ = + Atomic.set n_msgs_to_take n_msgs; + Atomic.set n_msgs_to_add n_msgs + in + let work i () = + if i < n_adders then + let rec work () = + let n = Util.alloc n_msgs_to_add in + if 0 < n then begin + for i = 1 to n do + Queue.push t i + done; + work () + end + in + work () + else + let rec work () = + let n = Util.alloc n_msgs_to_take in + if n <> 0 then + let rec loop n = + if 0 < n then + loop (n - Bool.to_int (Option.is_some (Queue.pop_opt t))) + else work () + in + loop n + in + work () + in + + let config = + let format role n = + Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s") + in + Printf.sprintf "%s, %s" + (format "nb adder" n_adders) + (format "nb taker" n_takers) + in + + Times.record ~budgetf ~n_domains ~init ~work () + |> Util.thruput_metrics ~n:n_msgs ~singular:"message" ~config + +let run_suite ~budgetf = + run_one_domain ~budgetf () + @ (Util.cross [ 1; 2 ] [ 1; 2 ] + |> List.concat_map @@ fun (n_adders, n_takers) -> + run_one ~budgetf ~n_adders ~n_takers ()) diff --git a/bench/main.ml b/bench/main.ml index 7cede696..5c6c3864 100644 --- a/bench/main.ml +++ b/bench/main.ml @@ -5,6 +5,7 @@ let benchmarks = ("Saturn_lockfree Single_prod_single_cons_queue", Bench_spsc_queue.run_suite); ("Saturn_lockfree Size", Bench_size.run_suite); ("Saturn_lockfree Skiplist", Bench_skiplist.run_suite); + ("Saturn_lockfree Two_stack_queue", Bench_two_stack_queue.run_suite); ("Stdlib Queue", Bench_stdlib_queue.run_suite); ] diff --git a/src/saturn.ml b/src/saturn.ml index ad9de103..1de6c918 100644 --- a/src/saturn.ml +++ b/src/saturn.ml @@ -36,3 +36,4 @@ module Single_prod_single_cons_queue = module Single_consumer_queue = Saturn_lockfree.Single_consumer_queue module Relaxed_queue = Mpmc_relaxed_queue module Skiplist = Saturn_lockfree.Skiplist +module Two_stack_queue = Saturn_lockfree.Two_stack_queue diff --git a/src/saturn.mli b/src/saturn.mli index 4e48698d..3611f688 100644 --- a/src/saturn.mli +++ b/src/saturn.mli @@ -40,3 +40,4 @@ module Single_prod_single_cons_queue = module Single_consumer_queue = Saturn_lockfree.Single_consumer_queue module Relaxed_queue = Mpmc_relaxed_queue module Skiplist = Saturn_lockfree.Skiplist +module Two_stack_queue = Saturn_lockfree.Two_stack_queue diff --git a/src_lockfree/saturn_lockfree.ml b/src_lockfree/saturn_lockfree.ml index 9bd337c9..2bb48e66 100644 --- a/src_lockfree/saturn_lockfree.ml +++ b/src_lockfree/saturn_lockfree.ml @@ -34,3 +34,4 @@ module Single_consumer_queue = Mpsc_queue module Relaxed_queue = Mpmc_relaxed_queue module Size = Size module Skiplist = Skiplist +module Two_stack_queue = Two_stack_queue diff --git a/src_lockfree/saturn_lockfree.mli b/src_lockfree/saturn_lockfree.mli index 5662b6b4..44ad0853 100644 --- a/src_lockfree/saturn_lockfree.mli +++ b/src_lockfree/saturn_lockfree.mli @@ -38,3 +38,4 @@ module Single_consumer_queue = Mpsc_queue module Relaxed_queue = Mpmc_relaxed_queue module Skiplist = Skiplist module Size = Size +module Two_stack_queue = Two_stack_queue diff --git a/src_lockfree/two_stack_queue.ml b/src_lockfree/two_stack_queue.ml new file mode 100644 index 00000000..02018e61 --- /dev/null +++ b/src_lockfree/two_stack_queue.ml @@ -0,0 +1,166 @@ +(* Copyright (c) 2023, Vesa Karvonen + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH + REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY + AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, + INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM + LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR + OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR + PERFORMANCE OF THIS SOFTWARE. *) + +module Atomic = Transparent_atomic + +type 'a t = { head : 'a head Atomic.t; tail : 'a tail Atomic.t } + +and ('a, _) tdt = + | Cons : { + counter : int; + value : 'a; + suffix : 'a head; + } + -> ('a, [> `Cons ]) tdt + | Head : { counter : int } -> ('a, [> `Head ]) tdt + | Snoc : { + counter : int; + prefix : 'a tail; + value : 'a; + } + -> ('a, [> `Snoc ]) tdt + | Tail : { + counter : int; + mutable move : ('a, [ `Snoc ]) tdt; + } + -> ('a, [> `Tail ]) tdt + +and 'a head = H : ('a, [< `Cons | `Head ]) tdt -> 'a head [@@unboxed] +and 'a tail = T : ('a, [< `Snoc | `Tail ]) tdt -> 'a tail [@@unboxed] + +let create () = + let head = + Atomic.make (H (Head { counter = 1 })) |> Multicore_magic.copy_as_padded + in + let tail = + Atomic.make (T (Tail { counter = 0; move = Obj.magic () })) + |> Multicore_magic.copy_as_padded + in + { head; tail } |> Multicore_magic.copy_as_padded + +let rec rev (suffix : (_, [< `Cons ]) tdt) = function + | T (Snoc { counter; prefix; value }) -> + rev (Cons { counter; value; suffix = H suffix }) prefix + | T (Tail _) -> suffix + +let rev = function + | (Snoc { counter; prefix; value } : (_, [< `Snoc ]) tdt) -> + rev + (Cons { counter; value; suffix = H (Head { counter = counter + 1 }) }) + prefix + +let rec push backoff t value = function + | T (Snoc snoc_r as snoc) -> push_with backoff t snoc_r.counter (T snoc) value + | T (Tail tail_r as tail) -> + let move = tail_r.move in + if move != Obj.magic () then begin + let (Snoc move_r) = move in + begin + match Atomic.get t.head with + | H (Head head_r as head) when head_r.counter < move_r.counter -> + let after = rev move in + if + Atomic.fenceless_get t.head == H head + && Atomic.compare_and_set t.head (H head) (H after) + then tail_r.move <- Obj.magic () + | _ -> () + end; + let new_tail = Atomic.fenceless_get t.tail in + if new_tail != T tail then push backoff t value new_tail + else push_with backoff t tail_r.counter (T tail) value + end + else push_with backoff t tail_r.counter (T tail) value + +and push_with backoff t counter prefix value = + let after = Snoc { counter = counter + 1; prefix; value } in + let new_tail = Atomic.fenceless_get t.tail in + if new_tail != prefix then push backoff t value new_tail + else if not (Atomic.compare_and_set t.tail prefix (T after)) then + let backoff = Backoff.once backoff in + push backoff t value (Atomic.fenceless_get t.tail) + +let push t value = push Backoff.default t value (Atomic.fenceless_get t.tail) + +exception Empty + +let rec pop backoff t = function + | H (Cons cons_r as cons) -> + if Atomic.compare_and_set t.head (H cons) cons_r.suffix then cons_r.value + else + let backoff = Backoff.once backoff in + pop backoff t (Atomic.fenceless_get t.head) + | H (Head _ as head) -> begin + match Atomic.fenceless_get t.tail with + | T (Snoc snoc_r as move) -> begin + match snoc_r.prefix with + | T (Tail _ as tail) -> + (*let tail = + Tail { counter = snoc_r.counter - 1; move = Obj.magic () } + in + *) + let new_head = Atomic.get t.head in + if new_head != H head then pop backoff t new_head + else if Atomic.compare_and_set t.tail (T move) (T tail) then + snoc_r.value + else pop backoff t (Atomic.fenceless_get t.head) + | T (Snoc _) -> + let tail = Tail { counter = snoc_r.counter; move } in + let new_head = Atomic.get t.head in + if new_head != H head then pop backoff t new_head + else if Atomic.compare_and_set t.tail (T move) (T tail) then + pop_moving backoff t head move tail + else pop backoff t (Atomic.fenceless_get t.head) + end + | T (Tail tail_r as tail) -> + let move = tail_r.move in + if move == Obj.magic () then pop_emptyish backoff t head + else pop_moving backoff t head move tail + end + +and pop_moving backoff t (Head head_r as head : (_, [< `Head ]) tdt) + (Snoc move_r as move) (Tail tail_r : (_, [< `Tail ]) tdt) = + if head_r.counter < move_r.counter then + match rev move with + | Cons cons_r -> + let after = cons_r.suffix in + let new_head = Atomic.get t.head in + if new_head != H head then pop backoff t new_head + else if Atomic.compare_and_set t.head (H head) after then begin + tail_r.move <- Obj.magic (); + cons_r.value + end + else + let backoff = Backoff.once backoff in + pop backoff t (Atomic.fenceless_get t.head) + else pop_emptyish backoff t head + +and pop_emptyish backoff t head = + let new_head = Atomic.get t.head in + if new_head == H head then raise_notrace Empty else pop backoff t new_head + +let pop t = pop Backoff.default t (Atomic.fenceless_get t.head) +let pop_opt t = match pop t with value -> Some value | exception Empty -> None + +let rec length t = + let head = Atomic.get t.head in + let tail = Atomic.fenceless_get t.tail in + if head != Atomic.get t.head then length t + else + let head_at = + match head with H (Cons r) -> r.counter | H (Head r) -> r.counter + in + let tail_at = + match tail with T (Snoc r) -> r.counter | T (Tail r) -> r.counter + in + tail_at - head_at + 1 diff --git a/src_lockfree/two_stack_queue.mli b/src_lockfree/two_stack_queue.mli new file mode 100644 index 00000000..b85b3459 --- /dev/null +++ b/src_lockfree/two_stack_queue.mli @@ -0,0 +1,20 @@ +type !'a t +(** *) + +val create : unit -> 'a t +(** *) + +val push : 'a t -> 'a -> unit +(** *) + +exception Empty +(** Raised by {!pop} in case the queue is empty. *) + +val pop : 'a t -> 'a +(** *) + +val pop_opt : 'a t -> 'a option +(** *) + +val length : 'a t -> int +(** *) diff --git a/test/two_stack_queue/dune b/test/two_stack_queue/dune new file mode 100644 index 00000000..7ac3bc53 --- /dev/null +++ b/test/two_stack_queue/dune @@ -0,0 +1,13 @@ +(test + (package saturn_lockfree) + (name stm_two_stack_queue) + (modules stm_two_stack_queue) + (libraries + saturn_lockfree + qcheck-core + qcheck-core.runner + qcheck-stm.stm + qcheck-stm.sequential + qcheck-stm.domain) + (action + (run %{test} --verbose))) diff --git a/test/two_stack_queue/stm_two_stack_queue.ml b/test/two_stack_queue/stm_two_stack_queue.ml new file mode 100644 index 00000000..21caec46 --- /dev/null +++ b/test/two_stack_queue/stm_two_stack_queue.ml @@ -0,0 +1,81 @@ +module Queue = Saturn_lockfree.Two_stack_queue + +let () = + let q = Queue.create () in + Queue.push q 101; + Queue.push q 42; + assert (Queue.pop_opt q = Some 101); + Queue.push q 76; + assert (Queue.pop_opt q = Some 42); + assert (Queue.pop_opt q = Some 76); + assert (Queue.pop_opt q = None) + +module Queue_spec = struct + type cmd = Push of int | Pop_opt | Length + + let show_cmd = function + | Push x -> "Push " ^ string_of_int x + | Pop_opt -> "Pop_opt" + | Length -> "Length" + + module State = struct + type t = int list * int list + + let push x (h, t) = if h == [] then ([ x ], []) else (h, x :: t) + let peek_opt (h, _) = match h with x :: _ -> Some x | [] -> None + let length (h, t) = List.length h + List.length t + + let drop ((h, t) as s) = + match h with [] -> s | [ _ ] -> (List.rev t, []) | _ :: h -> (h, t) + end + + type state = State.t + type sut = int Queue.t + + let arb_cmd _s = + let open QCheck in + [ + Gen.int_range 1 10000 |> Gen.map (fun x -> Push x); + Gen.return Pop_opt; + Gen.return Length; + ] + |> Gen.oneof |> make ~print:show_cmd + + let init_state = ([], []) + let init_sut () = Queue.create () + let cleanup _ = () + + let next_state c s = + match c with + | Push x -> State.push x s + | Pop_opt -> State.drop s + | Length -> s + + let precond _ _ = true + + let run c d = + let open STM in + match c with + | Push x -> Res (unit, Queue.push d x) + | Pop_opt -> Res (option int, Queue.pop_opt d) + | Length -> Res (int, Queue.length d) + + let postcond c (s : state) res = + let open STM in + match (c, res) with + | Push _x, Res ((Unit, _), ()) -> true + | Pop_opt, Res ((Option Int, _), res) -> res = State.peek_opt s + | Length, Res ((Int, _), res) -> res = State.length s + | _, _ -> false +end + +module Queue_seq = STM_sequential.Make (Queue_spec) +module Queue_par = STM_domain.Make (Queue_spec) + +let () = + let count = 1000 in + QCheck_base_runner.run_tests_main + [ + Queue_seq.agree_test ~count ~name:"STM Two_stack_queue test sequential"; + Queue_par.agree_test_par ~count ~name:"STM Two_stack_queue test parallel"; + ]