Skip to content

Commit

Permalink
Merge pull request #649 from basho/feature/ensembles-wait-for-riak-kv
Browse files Browse the repository at this point in the history
Feature/ensembles wait for riak kv
  • Loading branch information
engelsanchez committed Jun 23, 2014
2 parents 30cb327 + d32d007 commit 3662965
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 15 deletions.
22 changes: 14 additions & 8 deletions intercepts/intercept.erl
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
-module(intercept).
%% Export explicit API but also send compile directive to export all
%% becuase some of these private functions are useful in their own
%% because some of these private functions are useful in their own
%% right.
-export([add/3]).
-export([add/3, add/4]).
-compile(export_all).

-type abstract_code() :: term().
Expand Down Expand Up @@ -35,24 +35,30 @@
%%
%% E.g. `[{{update_perform,2}, sleep_update_perform}]'
-spec add(module(), module(), mapping()) -> ok.
add(Target, Intercept, Mapping) ->
add(Target, Intercept, Mapping, OutDir) ->
Original = ?ORIGINAL(Target),
TargetAC = get_abstract_code(Target),

ProxyAC = make_proxy_abstract_code(Target, Intercept, Mapping,
Original, TargetAC),
OrigAC = make_orig_abstract_code(Target, Original, TargetAC),

ok = compile_and_load(Original, OrigAC),
ok = compile_and_load(Target, ProxyAC).
ok = compile_and_load(Original, OrigAC, OutDir),
ok = compile_and_load(Target, ProxyAC, OutDir).

add(Target, Intercept, Mapping) ->
add(Target, Intercept, Mapping, undefined).

%% @private
%%
%% @doc Compile the abstract code `AC' and load it into the code server.
-spec compile_and_load(module(), abstract_code()) -> ok.
compile_and_load(Module, AC) ->
-spec compile_and_load(module(), abstract_code(), undefined | string()) -> ok.
compile_and_load(Module, AC, OutDir) ->
{ok, Module, Bin} = compile:forms(AC,[debug_info]),
{module, Module} = code:load_binary(Module, atom_to_list(Module), Bin),
ModStr = atom_to_list(Module),
_ = is_list(OutDir) andalso
file:write_file(filename:join(OutDir, ModStr ++ ".beam"), Bin),
{module, Module} = code:load_binary(Module, ModStr, Bin),
ok.

%% @private
Expand Down
20 changes: 14 additions & 6 deletions src/rt_intercept.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,24 @@ load_code(Node) ->
[ok = remote_compile_and_load(Node, F) || F <- intercept_files()],
ok.

add(Node, Intercepts) when is_list(Intercepts) ->
[ok = add(Node, I) || I <- Intercepts],
add_and_save(Node, Intercepts) ->
CodePaths = rpc:call(Node, code, get_path, []),
[PatchesDir] = [P || P <- CodePaths, lists:suffix("basho-patches", P)],
add(Node, Intercepts, PatchesDir).

add(Node, Intercepts) ->
add(Node, Intercepts, undefined).

add(Node, Intercepts, OutDir) when is_list(Intercepts) ->
[ok = add(Node, I, OutDir) || I <- Intercepts],
ok;

add(Node, {Target, Mapping}) ->
add(Node, {Target, ?DEFAULT_INTERCEPT(Target), Mapping});
add(Node, {Target, Mapping}, OutDir) ->
add(Node, {Target, ?DEFAULT_INTERCEPT(Target), Mapping}, OutDir);

add(Node, {Target, Intercept, Mapping}) ->
add(Node, {Target, Intercept, Mapping}, OutDir) ->
NMapping = [transform_anon_fun(M) || M <- Mapping],
ok = rpc:call(Node, intercept, add, [Target, Intercept, NMapping]).
ok = rpc:call(Node, intercept, add, [Target, Intercept, NMapping, OutDir]).

%% The following function transforms anonymous function mappings passed
%% from an Erlang shell. Anonymous intercept functions from compiled code
Expand Down
3 changes: 2 additions & 1 deletion src/rt_intercept_pt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ form({clause,L,H,G,B}) ->
{clause,L,H,G,forms(B)};
form({match,L,Lhs,Rhs}) ->
{match,L,forms(Lhs),forms(Rhs)};
form({call,L,{remote,_,{atom,_,rt_intercept},{atom,_,add}}=Fun,Args}) ->
form({call,L,{remote,_,{atom,_,rt_intercept},{atom,_,AddFunction}}=Fun,Args})
when AddFunction == add; AddFunction == add_and_save ->
[Node, Intercept] = Args,
{call,L,Fun,[Node,intercept(Intercept)]};
form(F) when is_tuple(F) ->
Expand Down
40 changes: 40 additions & 0 deletions tests/ensemble_basic2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

-module(ensemble_basic2).
-export([confirm/0]).
-compile({parse_transform, rt_intercept_pt}).
-include_lib("eunit/include/eunit.hrl").

confirm() ->
Expand All @@ -33,4 +34,43 @@ confirm() ->
lager:info("Killing all ensemble leaders"),
ok = ensemble_util:kill_leaders(Node, Ensembles),
ensemble_util:wait_until_stable(Node, NVal),
Peers = [PeerId || {PeerId, _PeerPid} <- ensemble_util:peers(Node)],
lager:info("Verifying peers wait for riak_kv_service"),
Delay = rt_config:get(kv_vnode_delay, 5000),
rt_intercept:add_and_save(Node, {riak_kv_vnode, [{{init, 1}, {[Delay],
fun(Args) ->
timer:sleep(Delay),
riak_kv_vnode_orig:init_orig(Args)
end}}]}),
rt:stop_and_wait(Node),
rt:start(Node),
lager:info("Polling peers while riak_kv starts. We should see none"),
UpNoPeersFun =
fun() ->
PL = ensemble_util:peers(Node),
NodePeers = [P || {P, _} <- PL],
NonRootPeers = [P || P <- NodePeers, element(1, P) /= root],
S = rpc:call(Node, riak_core_node_watcher, services, [Node]),
case S of
L when is_list(L) ->
case lists:member(riak_kv, L) of
true ->
true;
false ->
?assertEqual([], NonRootPeers)
end;
Err ->
?assertEqual(ok, {peer_get_error, Err})
end
end,
rt:wait_until(UpNoPeersFun),
lager:info("Perfect. riak_kv is now up and no peers started before that. "
"Now check they come back up"),
SPeers = lists:sort(Peers),
?assertEqual(ok, rt:wait_until(fun() ->
L = ensemble_util:peers(Node),
L2 = lists:sort([P || {P, _} <- L]),
SPeers == L2
end)),
lager:info("All expected peers are back. Life is good"),
pass.
3 changes: 3 additions & 0 deletions tests/ensemble_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ ensembles(Node) ->
get_leader_pid(Node, Ensemble) ->
rpc:call(Node, riak_ensemble_manager, get_leader_pid, [Ensemble]).

peers(Node) ->
rpc:call(Node, riak_ensemble_peer_sup, peers, []).

kill_leader(Node, Ensemble) ->
case get_leader_pid(Node, Ensemble) of
undefined ->
Expand Down

0 comments on commit 3662965

Please sign in to comment.