Skip to content

Commit

Permalink
Implement parallel broadcall
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Dec 16, 2023
1 parent cc2701c commit 52bdd15
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 4 deletions.
8 changes: 7 additions & 1 deletion src/wpool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
-export([start/0, start/2, stop/0, stop/1]).
-export([start_pool/1, start_pool/2, start_sup_pool/1, start_sup_pool/2]).
-export([stop_pool/1, stop_sup_pool/1]).
-export([call/2, cast/2, call/3, cast/3, call/4, broadcast/2]).
-export([call/2, cast/2, call/3, cast/3, call/4, broadcall/3, broadcast/2]).
-export([send_request/2, send_request/3, send_request/4]).
-export([stats/0, stats/1, get_workers/1]).
-export([default_strategy/0]).
Expand Down Expand Up @@ -239,3 +239,9 @@ get_workers(Sup) ->
-spec broadcast(wpool:name(), term()) -> ok.
broadcast(Sup, Cast) ->
wpool_pool:broadcast(Sup, Cast).

%% @doc Calls all the workers within the given pool async and waits for the responses synchronously
-spec broadcall(wpool:name(), term(), timeout()) ->
{[Replies :: term()], [Errors :: term()]}.
broadcall(Sup, Call, Timeout) ->
wpool_pool:broadcall(Sup, Call, Timeout).
27 changes: 26 additions & 1 deletion src/wpool_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
-export([start_link/2]).
-export([best_worker/1, random_worker/1, next_worker/1, hash_worker/2,
next_available_worker/1, send_request_available_worker/3, call_available_worker/3]).
-export([cast_to_available_worker/2, broadcast/2]).
-export([cast_to_available_worker/2, broadcast/2, broadcall/3]).
-export([stats/0, stats/1, get_workers/1]).
-export([worker_name/2, find_wpool/1]).
-export([next/2, wpool_get/2]).
Expand Down Expand Up @@ -158,6 +158,31 @@ broadcast(Name, Cast) ->
lists:foreach(fun(Worker) -> ok = wpool_process:cast(Worker, Cast) end,
all_workers(Name)).

%% @doc Calls all workers in the pool in parallel
%%
%% Waits for responses in parallel too, and it assumes that if any response times out,
%% all of them did too and therefore exits with reason timeout like a regular `gen_server' does.
-spec broadcall(wpool:name(), term(), timeout()) ->
{[Replies :: term()], [Errors :: term()]}.
broadcall(Name, Call, Timeout) ->
Workers = all_workers(Name),
ReqId0 = gen_server:reqids_new(),
RequestFold = fun(Worker, Acc) -> gen_server:send_request(Worker, Call, Name, Acc) end,
ReqId1 = lists:foldl(RequestFold, ReqId0, Workers),
WaitFold =
fun(_, {Coll, Replies, Errors}) ->
case gen_server:receive_response(Coll, Timeout, true) of
{{reply, Reply}, _, Coll1} ->
{Coll1, [Reply | Replies], Errors};
{{error, Error}, _, Coll1} ->
{Coll1, Replies, [Error | Errors]};
timeout ->
exit({timeout, {?MODULE, broadcall, [Name, Call, Timeout]}})
end
end,
{_, Replies, Errors} = lists:foldl(WaitFold, {ReqId1, [], []}, Workers),
{Replies, Errors}.

-spec all() -> [wpool:name()].
all() ->
[Name || {{?MODULE, Name}, _} <- persistent_term:get(), find_wpool(Name) /= undefined].
Expand Down
37 changes: 35 additions & 2 deletions test/wpool_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
-export([init_per_suite/1, end_per_suite/1]).
-export([stats/1, stop_pool/1, non_brutal_shutdown/1, brutal_worker_shutdown/1, overrun/1,
kill_on_overrun/1, too_much_overrun/1, default_strategy/1, overrun_handler1/1,
overrun_handler2/1, default_options/1, complete_coverage/1, broadcast/1, send_request/1,
worker_killed_stats/1]).
overrun_handler2/1, default_options/1, complete_coverage/1, broadcall/1, broadcast/1,
send_request/1, worker_killed_stats/1]).

-elvis([{elvis_style, no_block_expressions, disable}]).

Expand All @@ -46,6 +46,7 @@ all() ->
default_options,
complete_coverage,
broadcast,
broadcall,
send_request,
kill_on_overrun,
worker_killed_stats].
Expand Down Expand Up @@ -384,6 +385,38 @@ complete_coverage(_Config) ->

{comment, []}.

-spec broadcall(config()) -> {comment, []}.
broadcall(_Config) ->
Pool = broadcall,
WorkersCount = 19,
{ok, _Pid} = wpool:start_pool(Pool, [{workers, WorkersCount}]),

ct:comment("Check mecked function is called ~p times.", [WorkersCount]),
meck:new(x, [non_strict]),
meck:expect(x, x, fun() -> ok end),
% Broadcall x:x() execution to workers.
{[_ | _], _} = wpool:broadcall(Pool, {x, x, []}, infinity),
% Give some time for the workers to perform the calls.
WorkersCount = ktn_task:wait_for(fun() -> meck:num_calls(x, x, '_') end, WorkersCount),

ct:comment("Check they all are \"working\""),
try
% Make all the workers sleep for 1.5 seconds
wpool:broadcall(Pool, {timer, sleep, [1500]}, 1000),
% check they all are actually busy (executing timer:sleep/1 function).
ct:fail("They finished before the timeout")
catch
_:{timeout, _} ->
ok
end,

ct:comment("Check that failures are delivered as errors"),
meck:expect(x, fail, fun() -> exit(self(), shutdown) end),
{_, [_ | _]} = wpool:broadcall(Pool, {x, fail, []}, infinity),

meck:unload(x),
{comment, []}.

-spec broadcast(config()) -> {comment, []}.
broadcast(_Config) ->
Pool = broadcast,
Expand Down

0 comments on commit 52bdd15

Please sign in to comment.