Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MPMC unbounded queue #35

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 65 additions & 35 deletions bench/bench_spsc_queue.ml
Original file line number Diff line number Diff line change
@@ -1,37 +1,67 @@
open Lockfree

let item_count = 2_000_000

let rec try_until_success f =
try f () with Spsc_queue.Full -> try_until_success f

let run () =
let queue = Spsc_queue.create ~size_exponent:3 in
let pusher =
Domain.spawn (fun () ->
let start_time = Unix.gettimeofday () in
for i = 1 to item_count do
try_until_success (fun () -> Spsc_queue.push queue i)
done;
start_time)
in
for _ = 1 to item_count do
while Option.is_none (Spsc_queue.pop queue) do
()
done
done;
let end_time = Unix.gettimeofday () in
let start_time = Domain.join pusher in
let time_diff = end_time -. start_time in
time_diff

let bench () =
let results = ref [] in
for i = 1 to 10 do
let time = run () in
if i > 1 then results := time :: !results
done;
let results = List.sort Float.compare !results in
let median_time = List.nth results 4 in
let median_throughput = Float.of_int item_count /. median_time in
Benchmark_result.create_generic ~median_time ~median_throughput "spsc-queue"
module type QUEUE = sig
type 'a t

val make : unit -> int t
val push : 'a t -> 'a -> unit
val pop : 'a t -> 'a option
val name : string
end

module Bench (Q : QUEUE) = struct
let run () =
let queue = Q.make () in
let pusher =
Domain.spawn (fun () ->
let start_time = Unix.gettimeofday () in
for i = 1 to item_count do
Q.push queue i
done;
start_time)
in
for _ = 1 to item_count do
while Option.is_none (Q.pop queue) do
()
done
done;
let end_time = Unix.gettimeofday () in
let start_time = Domain.join pusher in
let time_diff = end_time -. start_time in
time_diff

let bench () =
let results = ref [] in
for i = 1 to 10 do
let time = run () in
if i > 1 then results := time :: !results
done;
let results = List.sort Float.compare !results in
let median_time = List.nth results 4 in
let median_throughput = Float.of_int item_count /. median_time in
Benchmark_result.create_generic ~median_time ~median_throughput Q.name
end

module Spsc_queue = Bench (struct
include Lockfree.Spsc_queue

let make () = create ~size_exponent:3
let rec push t x = try Lockfree.Spsc_queue.push t x with Full -> push t x
let name = "spsc-queue"
end)

module Mpmc_queue = Bench (struct
include Lockfree.Mpmc_queue

let make () = make ~dummy:(-1) ()
let name = "mpmc-queue"
end)

module Michael_scott_queue = Bench (struct
include Lockfree.Michael_scott_queue

let make () = create ()
let name = "michael-scott-queue"
end)

let bench = [ Spsc_queue.bench; Mpmc_queue.bench; Michael_scott_queue.bench ]
2 changes: 1 addition & 1 deletion bench/bench_spsc_queue.mli
Original file line number Diff line number Diff line change
@@ -1 +1 @@
val bench : unit -> Benchmark_result.t
val bench : (unit -> Benchmark_result.t) list
41 changes: 16 additions & 25 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,22 @@ let backoff_benchmarks =
]

let benchmark_list =
[
Bench_spsc_queue.bench;
Mpmc_queue.bench ~takers:4 ~pushers:4;
Mpmc_queue.bench ~takers:1 ~pushers:8;
Mpmc_queue.bench ~takers:8 ~pushers:1;
Mpmc_queue.bench ~use_cas:true ~takers:4 ~pushers:4;
Mpmc_queue.bench ~use_cas:true ~takers:1 ~pushers:8;
Mpmc_queue.bench ~use_cas:true ~takers:8 ~pushers:1;
]
@ backoff_benchmarks
Bench_spsc_queue.bench @ Mpmc_queue.bench @ backoff_benchmarks

let () =
let results =
(* todo: should assert no stranded domains between tests. *)
List.map (fun f -> f ()) benchmark_list
|> List.map Benchmark_result.to_json
|> String.concat ", "
in
let output =
Printf.sprintf {| {"name": "lockfree", "results": [%s]}|} results
(* Cannot use Yojson rewriters as of today none works on OCaml 5.1.0.
This at least verifies that the manually crafted JSON is well-formed.
List.iter
(fun f ->
(* todo: should assert no stranded domains between tests. *)
let r = f () in
let r = Benchmark_result.to_json r in
let output =
Printf.sprintf {| {"name": "lockfree", "results": [%s]}|} r
(* Cannot use Yojson rewriters as of today none works on OCaml 5.1.0.
This at least verifies that the manually crafted JSON is well-formed.

If the type grow, we could switch to running ppx manually on 5.0.0 and
pasting in its output. *)
|> Yojson.Basic.prettify
in
Printf.printf "%s" output
If the type grow, we could switch to running ppx manually on 5.0.0 and
pasting in its output. *)
|> Yojson.Basic.prettify
in
Printf.printf "%s\n%!" output)
benchmark_list
Loading