Skip to content

Commit

Permalink
MPMC 2-stack queue
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Jan 16, 2024
1 parent 61c5059 commit e99cf2a
Show file tree
Hide file tree
Showing 10 changed files with 358 additions and 0 deletions.
73 changes: 73 additions & 0 deletions bench/bench_two_stack_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
open Multicore_bench
module Queue = Saturn_lockfree.Two_stack_queue

let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
let t = Queue.create () in

let op push = if push then Queue.push t 101 else Queue.pop_opt t |> ignore in

let init _ =
assert (Queue.length t = 0);
Util.generate_push_and_pop_sequence n_msgs
in
let work _ bits = Util.Bits.iter op bits in

Times.record ~budgetf ~n_domains:1 ~init ~work ()
|> Util.thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"

let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2)
?(n_msgs = 50 * Util.iter_factor) () =
let n_domains = n_adders + n_takers in

let t = Queue.create () in

let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let n_msgs_to_add = Atomic.make 0 |> Multicore_magic.copy_as_padded in

let init _ =
Atomic.set n_msgs_to_take n_msgs;
Atomic.set n_msgs_to_add n_msgs
in
let work i () =
if i < n_adders then
let rec work () =
let n = Util.alloc n_msgs_to_add in
if 0 < n then begin
for i = 1 to n do
Queue.push t i
done;
work ()
end
in
work ()
else
let rec work () =
let n = Util.alloc n_msgs_to_take in
if n <> 0 then
let rec loop n =
if 0 < n then
loop (n - Bool.to_int (Option.is_some (Queue.pop_opt t)))
else work ()
in
loop n
in
work ()
in

let config =
let format role n =
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
in
Printf.sprintf "%s, %s"
(format "nb adder" n_adders)
(format "nb taker" n_takers)
in

Times.record ~budgetf ~n_domains ~init ~work ()
|> Util.thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
run_one_domain ~budgetf ()
@ (Util.cross [ 1; 2 ] [ 1; 2 ]
|> List.concat_map @@ fun (n_adders, n_takers) ->
run_one ~budgetf ~n_adders ~n_takers ())
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ let benchmarks =
("Saturn_lockfree Single_prod_single_cons_queue", Bench_spsc_queue.run_suite);
("Saturn_lockfree Size", Bench_size.run_suite);
("Saturn_lockfree Skiplist", Bench_skiplist.run_suite);
("Saturn_lockfree Two_stack_queue", Bench_two_stack_queue.run_suite);
("Stdlib Queue", Bench_stdlib_queue.run_suite);
]

Expand Down
1 change: 1 addition & 0 deletions src/saturn.ml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ module Single_prod_single_cons_queue =
module Single_consumer_queue = Saturn_lockfree.Single_consumer_queue
module Relaxed_queue = Mpmc_relaxed_queue
module Skiplist = Saturn_lockfree.Skiplist
module Two_stack_queue = Saturn_lockfree.Two_stack_queue
1 change: 1 addition & 0 deletions src/saturn.mli
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ module Single_prod_single_cons_queue =
module Single_consumer_queue = Saturn_lockfree.Single_consumer_queue
module Relaxed_queue = Mpmc_relaxed_queue
module Skiplist = Saturn_lockfree.Skiplist
module Two_stack_queue = Saturn_lockfree.Two_stack_queue
1 change: 1 addition & 0 deletions src_lockfree/saturn_lockfree.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ module Single_consumer_queue = Mpsc_queue
module Relaxed_queue = Mpmc_relaxed_queue
module Size = Size
module Skiplist = Skiplist
module Two_stack_queue = Two_stack_queue
1 change: 1 addition & 0 deletions src_lockfree/saturn_lockfree.mli
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ module Single_consumer_queue = Mpsc_queue
module Relaxed_queue = Mpmc_relaxed_queue
module Skiplist = Skiplist
module Size = Size
module Two_stack_queue = Two_stack_queue
166 changes: 166 additions & 0 deletions src_lockfree/two_stack_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
(* Copyright (c) 2023, Vesa Karvonen <[email protected]>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE. *)

module Atomic = Transparent_atomic

type 'a t = { head : 'a head Atomic.t; tail : 'a tail Atomic.t }

and ('a, _) tdt =
| Cons : {
counter : int;
value : 'a;
suffix : 'a head;
}
-> ('a, [> `Cons ]) tdt
| Head : { counter : int } -> ('a, [> `Head ]) tdt
| Snoc : {
counter : int;
prefix : 'a tail;
value : 'a;
}
-> ('a, [> `Snoc ]) tdt
| Tail : {
counter : int;
mutable move : ('a, [ `Snoc ]) tdt;
}
-> ('a, [> `Tail ]) tdt

and 'a head = H : ('a, [< `Cons | `Head ]) tdt -> 'a head [@@unboxed]
and 'a tail = T : ('a, [< `Snoc | `Tail ]) tdt -> 'a tail [@@unboxed]

let create () =
let head =
Atomic.make (H (Head { counter = 1 })) |> Multicore_magic.copy_as_padded
in
let tail =
Atomic.make (T (Tail { counter = 0; move = Obj.magic () }))
|> Multicore_magic.copy_as_padded
in
{ head; tail } |> Multicore_magic.copy_as_padded

let rec rev (suffix : (_, [< `Cons ]) tdt) = function
| T (Snoc { counter; prefix; value }) ->
rev (Cons { counter; value; suffix = H suffix }) prefix
| T (Tail _) -> suffix

let rev = function
| (Snoc { counter; prefix; value } : (_, [< `Snoc ]) tdt) ->
rev
(Cons { counter; value; suffix = H (Head { counter = counter + 1 }) })
prefix

let rec push backoff t value = function
| T (Snoc snoc_r as snoc) -> push_with backoff t snoc_r.counter (T snoc) value
| T (Tail tail_r as tail) ->
let move = tail_r.move in
if move != Obj.magic () then begin
let (Snoc move_r) = move in
begin
match Atomic.get t.head with
| H (Head head_r as head) when head_r.counter < move_r.counter ->
let after = rev move in
if
Atomic.fenceless_get t.head == H head
&& Atomic.compare_and_set t.head (H head) (H after)
then tail_r.move <- Obj.magic ()
| _ -> ()
end;
let new_tail = Atomic.fenceless_get t.tail in
if new_tail != T tail then push backoff t value new_tail
else push_with backoff t tail_r.counter (T tail) value
end
else push_with backoff t tail_r.counter (T tail) value

and push_with backoff t counter prefix value =
let after = Snoc { counter = counter + 1; prefix; value } in
let new_tail = Atomic.fenceless_get t.tail in
if new_tail != prefix then push backoff t value new_tail
else if not (Atomic.compare_and_set t.tail prefix (T after)) then
let backoff = Backoff.once backoff in
push backoff t value (Atomic.fenceless_get t.tail)

let push t value = push Backoff.default t value (Atomic.fenceless_get t.tail)

exception Empty

let rec pop backoff t = function
| H (Cons cons_r as cons) ->
if Atomic.compare_and_set t.head (H cons) cons_r.suffix then cons_r.value
else
let backoff = Backoff.once backoff in
pop backoff t (Atomic.fenceless_get t.head)
| H (Head _ as head) -> begin
match Atomic.fenceless_get t.tail with
| T (Snoc snoc_r as move) -> begin
match snoc_r.prefix with
| T (Tail _ as tail) ->
(*let tail =
Tail { counter = snoc_r.counter - 1; move = Obj.magic () }
in
*)
let new_head = Atomic.get t.head in
if new_head != H head then pop backoff t new_head
else if Atomic.compare_and_set t.tail (T move) (T tail) then
snoc_r.value
else pop backoff t (Atomic.fenceless_get t.head)
| T (Snoc _) ->
let tail = Tail { counter = snoc_r.counter; move } in
let new_head = Atomic.get t.head in
if new_head != H head then pop backoff t new_head
else if Atomic.compare_and_set t.tail (T move) (T tail) then
pop_moving backoff t head move tail
else pop backoff t (Atomic.fenceless_get t.head)
end
| T (Tail tail_r as tail) ->
let move = tail_r.move in
if move == Obj.magic () then pop_emptyish backoff t head
else pop_moving backoff t head move tail
end

and pop_moving backoff t (Head head_r as head : (_, [< `Head ]) tdt)
(Snoc move_r as move) (Tail tail_r : (_, [< `Tail ]) tdt) =
if head_r.counter < move_r.counter then
match rev move with
| Cons cons_r ->
let after = cons_r.suffix in
let new_head = Atomic.get t.head in
if new_head != H head then pop backoff t new_head
else if Atomic.compare_and_set t.head (H head) after then begin
tail_r.move <- Obj.magic ();
cons_r.value
end
else
let backoff = Backoff.once backoff in
pop backoff t (Atomic.fenceless_get t.head)
else pop_emptyish backoff t head

and pop_emptyish backoff t head =
let new_head = Atomic.get t.head in
if new_head == H head then raise_notrace Empty else pop backoff t new_head

let pop t = pop Backoff.default t (Atomic.fenceless_get t.head)
let pop_opt t = match pop t with value -> Some value | exception Empty -> None

let rec length t =
let head = Atomic.get t.head in
let tail = Atomic.fenceless_get t.tail in
if head != Atomic.get t.head then length t
else
let head_at =
match head with H (Cons r) -> r.counter | H (Head r) -> r.counter
in
let tail_at =
match tail with T (Snoc r) -> r.counter | T (Tail r) -> r.counter
in
tail_at - head_at + 1
20 changes: 20 additions & 0 deletions src_lockfree/two_stack_queue.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
type !'a t
(** *)

val create : unit -> 'a t
(** *)

val push : 'a t -> 'a -> unit
(** *)

exception Empty
(** Raised by {!pop} in case the queue is empty. *)

val pop : 'a t -> 'a
(** *)

val pop_opt : 'a t -> 'a option
(** *)

val length : 'a t -> int
(** *)
13 changes: 13 additions & 0 deletions test/two_stack_queue/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
(test
(package saturn_lockfree)
(name stm_two_stack_queue)
(modules stm_two_stack_queue)
(libraries
saturn_lockfree
qcheck-core
qcheck-core.runner
qcheck-stm.stm
qcheck-stm.sequential
qcheck-stm.domain)
(action
(run %{test} --verbose)))
81 changes: 81 additions & 0 deletions test/two_stack_queue/stm_two_stack_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
module Queue = Saturn_lockfree.Two_stack_queue

let () =
let q = Queue.create () in
Queue.push q 101;
Queue.push q 42;
assert (Queue.pop_opt q = Some 101);
Queue.push q 76;
assert (Queue.pop_opt q = Some 42);
assert (Queue.pop_opt q = Some 76);
assert (Queue.pop_opt q = None)

module Queue_spec = struct
type cmd = Push of int | Pop_opt | Length

let show_cmd = function
| Push x -> "Push " ^ string_of_int x
| Pop_opt -> "Pop_opt"
| Length -> "Length"

module State = struct
type t = int list * int list

let push x (h, t) = if h == [] then ([ x ], []) else (h, x :: t)
let peek_opt (h, _) = match h with x :: _ -> Some x | [] -> None
let length (h, t) = List.length h + List.length t

let drop ((h, t) as s) =
match h with [] -> s | [ _ ] -> (List.rev t, []) | _ :: h -> (h, t)
end

type state = State.t
type sut = int Queue.t

let arb_cmd _s =
let open QCheck in
[
Gen.int_range 1 10000 |> Gen.map (fun x -> Push x);
Gen.return Pop_opt;
Gen.return Length;
]
|> Gen.oneof |> make ~print:show_cmd

let init_state = ([], [])
let init_sut () = Queue.create ()
let cleanup _ = ()

let next_state c s =
match c with
| Push x -> State.push x s
| Pop_opt -> State.drop s
| Length -> s

let precond _ _ = true

let run c d =
let open STM in
match c with
| Push x -> Res (unit, Queue.push d x)
| Pop_opt -> Res (option int, Queue.pop_opt d)
| Length -> Res (int, Queue.length d)

let postcond c (s : state) res =
let open STM in
match (c, res) with
| Push _x, Res ((Unit, _), ()) -> true
| Pop_opt, Res ((Option Int, _), res) -> res = State.peek_opt s
| Length, Res ((Int, _), res) -> res = State.length s
| _, _ -> false
end

module Queue_seq = STM_sequential.Make (Queue_spec)
module Queue_par = STM_domain.Make (Queue_spec)

let () =
let count = 1000 in
QCheck_base_runner.run_tests_main
[
Queue_seq.agree_test ~count ~name:"STM Two_stack_queue test sequential";
Queue_par.agree_test_par ~count ~name:"STM Two_stack_queue test parallel";
]

0 comments on commit e99cf2a

Please sign in to comment.