diff --git a/bench/bench_spsc_queue.ml b/bench/bench_spsc_queue.ml index 7ca6e2ee..9dab65f9 100644 --- a/bench/bench_spsc_queue.ml +++ b/bench/bench_spsc_queue.ml @@ -1,37 +1,67 @@ -open Lockfree - let item_count = 2_000_000 -let rec try_until_success f = - try f () with Spsc_queue.Full -> try_until_success f - -let run () = - let queue = Spsc_queue.create ~size_exponent:3 in - let pusher = - Domain.spawn (fun () -> - let start_time = Unix.gettimeofday () in - for i = 1 to item_count do - try_until_success (fun () -> Spsc_queue.push queue i) - done; - start_time) - in - for _ = 1 to item_count do - while Option.is_none (Spsc_queue.pop queue) do - () - done - done; - let end_time = Unix.gettimeofday () in - let start_time = Domain.join pusher in - let time_diff = end_time -. start_time in - time_diff - -let bench () = - let results = ref [] in - for i = 1 to 10 do - let time = run () in - if i > 1 then results := time :: !results - done; - let results = List.sort Float.compare !results in - let median_time = List.nth results 4 in - let median_throughput = Float.of_int item_count /. median_time in - Benchmark_result.create_generic ~median_time ~median_throughput "spsc-queue" +module type QUEUE = sig + type 'a t + + val make : unit -> int t + val push : 'a t -> 'a -> unit + val pop : 'a t -> 'a option + val name : string +end + +module Bench (Q : QUEUE) = struct + let run () = + let queue = Q.make () in + let pusher = + Domain.spawn (fun () -> + let start_time = Unix.gettimeofday () in + for i = 1 to item_count do + Q.push queue i + done; + start_time) + in + for _ = 1 to item_count do + while Option.is_none (Q.pop queue) do + () + done + done; + let end_time = Unix.gettimeofday () in + let start_time = Domain.join pusher in + let time_diff = end_time -. start_time in + time_diff + + let bench () = + let results = ref [] in + for i = 1 to 10 do + let time = run () in + if i > 1 then results := time :: !results + done; + let results = List.sort Float.compare !results in + let median_time = List.nth results 4 in + let median_throughput = Float.of_int item_count /. median_time in + Benchmark_result.create_generic ~median_time ~median_throughput Q.name +end + +module Spsc_queue = Bench (struct + include Lockfree.Spsc_queue + + let make () = create ~size_exponent:3 + let rec push t x = try Lockfree.Spsc_queue.push t x with Full -> push t x + let name = "spsc-queue" +end) + +module Mpmc_queue = Bench (struct + include Lockfree.Mpmc_queue + + let make () = make ~dummy:(-1) () + let name = "mpmc-queue" +end) + +module Michael_scott_queue = Bench (struct + include Lockfree.Michael_scott_queue + + let make () = create () + let name = "michael-scott-queue" +end) + +let bench = [ Spsc_queue.bench; Mpmc_queue.bench; Michael_scott_queue.bench ] diff --git a/bench/bench_spsc_queue.mli b/bench/bench_spsc_queue.mli index fa7ab819..9a98639d 100644 --- a/bench/bench_spsc_queue.mli +++ b/bench/bench_spsc_queue.mli @@ -1 +1 @@ -val bench : unit -> Benchmark_result.t +val bench : (unit -> Benchmark_result.t) list diff --git a/bench/main.ml b/bench/main.ml index c6474cb9..2650dbfd 100644 --- a/bench/main.ml +++ b/bench/main.ml @@ -8,31 +8,22 @@ let backoff_benchmarks = ] let benchmark_list = - [ - Bench_spsc_queue.bench; - Mpmc_queue.bench ~takers:4 ~pushers:4; - Mpmc_queue.bench ~takers:1 ~pushers:8; - Mpmc_queue.bench ~takers:8 ~pushers:1; - Mpmc_queue.bench ~use_cas:true ~takers:4 ~pushers:4; - Mpmc_queue.bench ~use_cas:true ~takers:1 ~pushers:8; - Mpmc_queue.bench ~use_cas:true ~takers:8 ~pushers:1; - ] - @ backoff_benchmarks + Bench_spsc_queue.bench @ Mpmc_queue.bench @ backoff_benchmarks let () = - let results = - (* todo: should assert no stranded domains between tests. *) - List.map (fun f -> f ()) benchmark_list - |> List.map Benchmark_result.to_json - |> String.concat ", " - in - let output = - Printf.sprintf {| {"name": "lockfree", "results": [%s]}|} results - (* Cannot use Yojson rewriters as of today none works on OCaml 5.1.0. - This at least verifies that the manually crafted JSON is well-formed. + List.iter + (fun f -> + (* todo: should assert no stranded domains between tests. *) + let r = f () in + let r = Benchmark_result.to_json r in + let output = + Printf.sprintf {| {"name": "lockfree", "results": [%s]}|} r + (* Cannot use Yojson rewriters as of today none works on OCaml 5.1.0. + This at least verifies that the manually crafted JSON is well-formed. - If the type grow, we could switch to running ppx manually on 5.0.0 and - pasting in its output. *) - |> Yojson.Basic.prettify - in - Printf.printf "%s" output + If the type grow, we could switch to running ppx manually on 5.0.0 and + pasting in its output. *) + |> Yojson.Basic.prettify + in + Printf.printf "%s\n%!" output) + benchmark_list diff --git a/bench/mpmc_queue.ml b/bench/mpmc_queue.ml index 2c919d15..9c958c0c 100644 --- a/bench/mpmc_queue.ml +++ b/bench/mpmc_queue.ml @@ -1,99 +1,196 @@ -open Lockfree.Mpmc_relaxed_queue - -let num_of_elements = ref 500_000 -let num_of_pushers = ref 4 -let num_of_takers = ref 4 -let num_of_iterations = ref 10 -let use_cas_intf = ref false -let pop = ref Not_lockfree.pop -let push = ref Not_lockfree.push - -let taker queue num_of_elements () = - let i = ref 0 in - while !i < num_of_elements do - if Option.is_some (!pop queue) then i := !i + 1 - done - -let pusher queue num_of_elements () = - let i = ref 0 in - while !i < num_of_elements do - if !push queue !i then i := !i + 1 - done - -let create_output ~time_median ~throughput_median ~throughput_stddev = - let time = - ({ - name = "time"; - value = `Numeric time_median; - units = "s"; - description = "median time"; - } - : Benchmark_result.Metric.t) - in - let throughput = - ({ - name = "throughput"; - value = `Numeric throughput_median; - units = "item/s"; - description = "median throughput"; - } - : Benchmark_result.Metric.t) - in - let throughput_stddev = - ({ - name = "throughput-stddev"; - value = `Numeric throughput_stddev; - units = "item/s"; - description = "stddev throughput"; - } - : Benchmark_result.Metric.t) - in - let metrics = [ time; throughput; throughput_stddev ] in - let name = - Printf.sprintf "mpmc-queue-pushers:%d,takers:%d,use-cas:%b" !num_of_pushers - !num_of_takers !use_cas_intf - in - ({ name; metrics } : Benchmark_result.t) - -let run_bench () = - if !use_cas_intf then ( - push := Lockfree.Mpmc_relaxed_queue.Not_lockfree.CAS_interface.push; - pop := Lockfree.Mpmc_relaxed_queue.Not_lockfree.CAS_interface.pop); - let queue = create ~size_exponent:10 () in - let orchestrator = - Orchestrator.init - ~total_domains:(!num_of_takers + !num_of_pushers) - ~rounds:!num_of_iterations - in - (* define function to start domains *) - let start_n_domains n f = - assert (!num_of_elements mod n == 0); - let items_per_pusher = !num_of_elements / n in - List.init n (fun _ -> - Domain.spawn (fun () -> - Orchestrator.worker orchestrator (f queue items_per_pusher))) - in - (* start domains *) - let domains = - let takers = start_n_domains !num_of_takers taker in - let pushers = start_n_domains !num_of_pushers pusher in - Sys.opaque_identity (pushers @ takers) - in - (* run test *) - let times = Orchestrator.run orchestrator in - List.iter Domain.join domains; - let time_median = Stats.median times in - let throughputs = - List.map (fun time -> Int.to_float !num_of_elements /. time) times +module type QUEUE = sig + type 'a t + + val make : unit -> int t + val push : 'a t -> 'a -> unit + val pop : 'a t -> 'a option + val name : string +end + +module Bench (Q : QUEUE) = struct + let num_of_elements = ref 2_100_000 + let num_of_pushers = ref 4 + let num_of_takers = ref 4 + let num_of_iterations = ref 20 + + let taker queue num_of_elements () = + let i = ref 0 in + while !i < num_of_elements do + if Option.is_some (Q.pop queue) then i := !i + 1 + done + + let pusher queue num_of_elements () = + for i = 0 to num_of_elements - 1 do + Q.push queue i + done + + let create_output ~time_median ~throughput_median ~throughput_stddev = + let time = + ({ + name = "time"; + value = `Numeric time_median; + units = "s"; + description = "median time"; + } + : Benchmark_result.Metric.t) + in + let throughput = + ({ + name = "throughput"; + value = `Numeric throughput_median; + units = "item/s"; + description = "median throughput"; + } + : Benchmark_result.Metric.t) + in + let throughput_stddev = + ({ + name = "throughput-stddev"; + value = `Numeric throughput_stddev; + units = "item/s"; + description = "stddev throughput"; + } + : Benchmark_result.Metric.t) + in + let metrics = [ time; throughput; throughput_stddev ] in + let name = + Printf.sprintf "%s-pushers:%d,takers:%d" Q.name !num_of_pushers + !num_of_takers + in + ({ name; metrics } : Benchmark_result.t) + + let run_bench () = + let queue = Q.make () in + let orchestrator = + Orchestrator.init + ~total_domains:(!num_of_takers + !num_of_pushers) + ~rounds:!num_of_iterations + in + (* define function to start domains *) + let start_n_domains n f = + assert (!num_of_elements mod n == 0); + let items_per_pusher = !num_of_elements / n in + List.init n (fun _ -> + Domain.spawn (fun () -> + Orchestrator.worker orchestrator (f queue items_per_pusher))) + in + (* start domains *) + let domains = + let takers = start_n_domains !num_of_takers taker in + let pushers = start_n_domains !num_of_pushers pusher in + Sys.opaque_identity (pushers @ takers) + in + (* run test *) + let times = Orchestrator.run orchestrator in + List.iter Domain.join domains; + let time_median = Stats.median times in + let throughputs = + List.map (fun time -> Int.to_float !num_of_elements /. time) times + in + let throughput_median = Stats.median throughputs in + let throughput_stddev = Stats.stddev throughputs in + create_output ~time_median ~throughput_median ~throughput_stddev + + let benchmark ?takers ?pushers ?iterations ?elements () = + num_of_takers := Option.value takers ~default:!num_of_takers; + num_of_pushers := Option.value pushers ~default:!num_of_pushers; + num_of_iterations := Option.value iterations ~default:!num_of_iterations; + num_of_elements := Option.value elements ~default:!num_of_elements; + run_bench () + + let bench_one_producer : (unit -> _) list = + [ + benchmark ~takers:1 ~pushers:1; + benchmark ~takers:2 ~pushers:1; + benchmark ~takers:3 ~pushers:1; + benchmark ~takers:4 ~pushers:1; + benchmark ~takers:5 ~pushers:1; + benchmark ~takers:6 ~pushers:1; + benchmark ~takers:7 ~pushers:1; + ] + + let bench_one_consumer : (unit -> _) list = + [ + benchmark ~takers:1 ~pushers:2; + benchmark ~takers:1 ~pushers:3; + benchmark ~takers:1 ~pushers:4; + benchmark ~takers:1 ~pushers:5; + benchmark ~takers:1 ~pushers:6; + benchmark ~takers:1 ~pushers:7; + ] + + let bench_balanced : (unit -> _) list = + [ + benchmark ~takers:2 ~pushers:2; + benchmark ~takers:3 ~pushers:3; + benchmark ~takers:4 ~pushers:4; + ] + + let bench = bench_one_producer @ bench_one_consumer @ bench_balanced +end + +module Michael_scott_queue = Bench (struct + let name = "michael-scott-queue" + + include Lockfree.Michael_scott_queue + + let make () = create () +end) + +module Relaxed = Bench (struct + let name = "mpmc-relaxed-fad" + + module Q = Lockfree.Mpmc_relaxed_queue + include Q.Not_lockfree + + type 'a t = 'a Q.t + + let make () = Q.create ~size_exponent:10 () + let rec push t x = if not (Q.Not_lockfree.push t x) then push t x +end) + +module Relaxed_cas = Bench (struct + let name = "mpmc-relaxed-cas" + + module Q = Lockfree.Mpmc_relaxed_queue + include Q.Not_lockfree.CAS_interface + + type 'a t = 'a Q.t + + let make () = Q.create ~size_exponent:10 () + + let rec push t x = + if not (Q.Not_lockfree.CAS_interface.push t x) then push t x +end) + +module Unbounded = Bench (struct + let name = "mpmc-unbounded" + + include Lockfree.Mpmc_queue + + let make () = make ~dummy:(-1) () +end) + +module Ws_deque = Bench (struct + let name = "work-stealing-deque" + + include Lockfree.Ws_deque.M + + let make () = create () + let pop t = try Some (steal t) with Exit -> None +end) + +let bench = + Michael_scott_queue.bench @ Relaxed.bench @ Relaxed_cas.bench + @ Unbounded.bench @ Ws_deque.bench_one_producer + +let benchmark ~takers ~pushers ~impl ~iterations ~elements () = + let impl = + match impl with + | `MS -> Michael_scott_queue.benchmark + | `CAS -> Relaxed_cas.benchmark + | `FAD -> Relaxed.benchmark + | `Unbounded -> Unbounded.benchmark + | `WS -> Ws_deque.benchmark in - let throughput_median = Stats.median throughputs in - let throughput_stddev = Stats.stddev throughputs in - create_output ~time_median ~throughput_median ~throughput_stddev - -let bench ?takers ?pushers ?use_cas ?iterations ?elements () = - num_of_takers := Option.value takers ~default:!num_of_takers; - num_of_pushers := Option.value pushers ~default:!num_of_pushers; - use_cas_intf := Option.value use_cas ~default:!use_cas_intf; - num_of_iterations := Option.value iterations ~default:!num_of_iterations; - num_of_elements := Option.value elements ~default:!num_of_elements; - run_bench () + impl ~takers ~pushers ~iterations ~elements () diff --git a/bench/mpmc_queue.mli b/bench/mpmc_queue.mli index 0cce0278..15bef053 100644 --- a/bench/mpmc_queue.mli +++ b/bench/mpmc_queue.mli @@ -1,8 +1,10 @@ -val bench : - ?takers:int -> - ?pushers:int -> - ?use_cas:bool -> - ?iterations:int -> - ?elements:int -> +val bench : (unit -> Benchmark_result.t) list + +val benchmark : + takers:int -> + pushers:int -> + impl:[ `MS | `CAS | `FAD | `Unbounded | `WS ] -> + iterations:int -> + elements:int -> unit -> Benchmark_result.t diff --git a/bench/mpmc_queue_cmd.ml b/bench/mpmc_queue_cmd.ml index abf19c1b..db8fd721 100644 --- a/bench/mpmc_queue_cmd.ml +++ b/bench/mpmc_queue_cmd.ml @@ -2,7 +2,17 @@ let elements = ref 100_000 let pushers = ref 4 let takers = ref 4 let iterations = ref 10 -let use_cas = ref false +let impl = ref `FAD + +let use_impl = function + | "MS" -> impl := `MS + | "CAS" -> impl := `CAS + | "FAD" -> impl := `FAD + | "UNBOUNDED" -> impl := `Unbounded + | "WS" -> impl := `WS + | str -> + Printf.ksprintf failwith "-impl expected MS|CAS|FAD|UNBOUNDED|WS, got %S" + str let speclist = [ @@ -10,16 +20,18 @@ let speclist = ("-pushers", Arg.Set_int pushers, "number of domains pushing items"); ("-takers", Arg.Set_int takers, "number of domains taking times"); ("-iterations", Arg.Set_int iterations, "run the benchmark this many times"); - ("-use-cas", Arg.Set use_cas, "use CAS instead of FAD"); + ( "-impl", + Arg.String use_impl, + "queue implementation to use: MS or CAS or FAD or UNBOUNDED or WS" ); ] -let _f () = +let () = Arg.parse speclist (fun _ -> ()) "mpmc_queue.exe [-items INT] [-pushers INT] [-takers INT] [-iterations \ - INT] [-use-cas]"; + INT] [-impl CAS|FAD|UNBOUNDED]"; let result = - Mpmc_queue.bench ~takers:!takers ~pushers:!pushers ~use_cas:!use_cas + Mpmc_queue.benchmark ~takers:!takers ~pushers:!pushers ~impl:!impl ~iterations:!iterations ~elements:!elements () in - Benchmark_result.to_json result |> Yojson.Basic.prettify |> print_string + Benchmark_result.to_json result |> Yojson.Basic.prettify |> print_endline diff --git a/lockfree.opam b/lockfree.opam index 606c8818..d37bcc45 100644 --- a/lockfree.opam +++ b/lockfree.opam @@ -14,6 +14,7 @@ depends: [ "domain_shims" {>= "0.1.0"} "qcheck" {with-test & >= "0.18.1"} "qcheck-stm" {with-test & >= "0.1"} + "qcheck-lin" {with-test & >= "0.1"} "qcheck-alcotest" {with-test & >= "0.18.1"} "alcotest" {>= "1.6.0"} "yojson" {>= "2.0.2"} diff --git a/src/lockfree.ml b/src/lockfree.ml index 57eb3b9d..0b529abd 100644 --- a/src/lockfree.ml +++ b/src/lockfree.ml @@ -28,6 +28,7 @@ Copyright (c) 2017, Nicolas ASSOUAD module Ws_deque = Ws_deque module Spsc_queue = Spsc_queue module Mpsc_queue = Mpsc_queue +module Mpmc_queue = Mpmc_queue module Treiber_stack = Treiber_stack module Michael_scott_queue = Michael_scott_queue module Backoff = Backoff diff --git a/src/lockfree.mli b/src/lockfree.mli index 5b50ca60..3bba19b9 100644 --- a/src/lockfree.mli +++ b/src/lockfree.mli @@ -33,6 +33,7 @@ Copyright (c) 2017, Nicolas ASSOUAD module Ws_deque = Ws_deque module Spsc_queue = Spsc_queue module Mpsc_queue = Mpsc_queue +module Mpmc_queue = Mpmc_queue module Treiber_stack = Treiber_stack module Michael_scott_queue = Michael_scott_queue module Mpmc_relaxed_queue = Mpmc_relaxed_queue diff --git a/src/mpmc_queue.ml b/src/mpmc_queue.ml new file mode 100644 index 00000000..f80e2d39 --- /dev/null +++ b/src/mpmc_queue.ml @@ -0,0 +1,110 @@ +module Array = struct + include Array + + let get = unsafe_get + let set = unsafe_set +end + +let default_capacity = 512 + +type 'a s = { + status : int Atomic.t array; + buffer : 'a array; + head : int Atomic.t; + tail : int Atomic.t; + rest : 'a s option Atomic.t; +} + +type 'a t = { first : 'a s Atomic.t; last : 'a s Atomic.t; dummy : 'a } + +let pack_size = Sys.int_size / 2 + +let make_s ~capacity ~dummy = + { + head = Atomic.make 0; + tail = Atomic.make (-1); + buffer = Array.make capacity dummy; + status = Array.init (1 + (capacity / pack_size)) (fun _ -> Atomic.make 0); + rest = Atomic.make None; + } + +let make ?(capacity = default_capacity) ~dummy () = + let s = make_s ~capacity ~dummy in + { first = Atomic.make s; last = Atomic.make s; dummy } + +let rec gift_rest t some_s = + if not (Atomic.compare_and_set t.rest None some_s) then follow_rest t some_s + +and follow_rest t some_s = + match Atomic.get t.rest with + | None -> gift_rest t some_s + | Some t -> follow_rest t some_s + +let force_rest ~dummy t = + match Atomic.get t.rest with + | Some s -> s + | None -> ( + let s = make_s ~capacity:(Array.length t.buffer) ~dummy in + let some_s = Some s in + if Atomic.compare_and_set t.rest None some_s then s + else + match Atomic.get t.rest with + | None -> assert false + | Some rest -> + gift_rest rest some_s; + rest) + +let mark t i = + let status = t.status.(i / pack_size) in + let shift = 2 * (i mod pack_size) in + let status = Atomic.fetch_and_add status (1 lsl shift) in + (status lsr shift) land 1 = 0 + +let rec push_s ~dummy t x = + let i = Atomic.fetch_and_add t.tail 1 in + if i < 0 then + let _ = force_rest ~dummy t in + push_s ~dummy t x + else if i >= Array.length t.buffer then false + else ( + t.buffer.(i) <- x; + if mark t i then true + else + let hd = Atomic.get t.head in + let (_ : bool) = Atomic.compare_and_set t.tail (i + 1) hd in + t.buffer.(i) <- dummy; + push_s ~dummy t x) + +let rec push ({ last; dummy; _ } as t) x = + let last_s = Atomic.get last in + if not (push_s ~dummy last_s x) then + let rest = force_rest ~dummy last_s in + let (_ : bool) = Atomic.compare_and_set last last_s rest in + push t x + +type 'a pop_result = Is_empty | Wait_for_it | Pop of 'a + +let rec pop_s ~dummy t = + let current_head = Atomic.get t.head in + if current_head >= Array.length t.buffer then Is_empty + else if current_head >= Atomic.get t.tail then Wait_for_it + else + let i = Atomic.fetch_and_add t.head 1 in + if i >= Array.length t.buffer then Is_empty + else if mark t i then pop_s ~dummy t + else + let v = t.buffer.(i) in + t.buffer.(i) <- dummy; + Pop v + +let rec pop t = + let first = Atomic.get t.first in + match pop_s ~dummy:t.dummy first with + | Pop v -> Some v + | Wait_for_it -> None + | Is_empty -> ( + match Atomic.get first.rest with + | None -> None + | Some rest -> + let (_ : bool) = Atomic.compare_and_set t.first first rest in + pop t) diff --git a/src/mpmc_queue.mli b/src/mpmc_queue.mli new file mode 100644 index 00000000..70f6f174 --- /dev/null +++ b/src/mpmc_queue.mli @@ -0,0 +1,21 @@ +(** A lock-free multi-producer, multi-consummer, thread-safe unbounded queue. *) + +type 'a t +(** A queue of items of type ['a]. *) + +val make : ?capacity:int -> dummy:'a -> unit -> 'a t +(** [make ~dummy ()] creates a new empty queue. + + - The [dummy] element is a placeholder for ['a] values. + - The optional parameter [?capacity] defaults to 512 and is used to size the + internal buffers of the queue: Choosing a small number lower the pause + durations caused by allocations, but a larger capacity can provide overall + faster operations. +*) + +val push : 'a t -> 'a -> unit +(** [push t x] adds [x] to the tail of the queue. *) + +val pop : 'a t -> 'a option +(** [pop t] removes the head item from [t] and returns it. + Returns [None] if [t] is currently empty. *) diff --git a/test/mpmc_queue/dune b/test/mpmc_queue/dune new file mode 100644 index 00000000..a3c17746 --- /dev/null +++ b/test/mpmc_queue/dune @@ -0,0 +1,3 @@ +(test + (name lin_mpmc_queue) + (libraries lockfree qcheck-lin.domain)) diff --git a/test/mpmc_queue/lin_mpmc_queue.ml b/test/mpmc_queue/lin_mpmc_queue.ml new file mode 100644 index 00000000..2803f68c --- /dev/null +++ b/test/mpmc_queue/lin_mpmc_queue.ml @@ -0,0 +1,22 @@ +module Q = Lockfree.Mpmc_queue + +module Test = struct + type t = int Q.t + + let init () = Q.make ~capacity:1 ~dummy:(-1) () + let cleanup _ = () + + open Lin + + let api = + [ + val_ "push" Q.push (t @-> int @-> returning unit); + val_ "pop" Q.pop (t @-> returning (option int)); + ] +end + +module T = Lin_domain.Make (Test) + +let () = + QCheck_base_runner.run_tests_main + [ T.lin_test ~count:10_000 ~name:"MPMC Queue" ]