Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dist blocker #51

Merged
merged 9 commits into from
Mar 8, 2024
156 changes: 156 additions & 0 deletions src/cets_dist_blocker.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
%% @doc Disallow distributed erlang connections until cleaning is done.
%%
%% This module prevents a node from reconnecting, until cleaning activity is
%% finished. It prevents race conditions.
%%
%% This module assume all nodes share the same cookie.
chrzaszcz marked this conversation as resolved.
Show resolved Hide resolved
-module(cets_dist_blocker).
-behaviour(gen_server).
-include_lib("kernel/include/logger.hrl").

%% API
-export([
start_link/0,
add_cleaner/1,
cleaning_done/2
]).

%% gen_server callbacks
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).

-ignore_xref([
start_link/0,
add_cleaner/1,
cleaning_done/2
]).

-type cleaner_pid() :: pid().
-type waiting() :: [{node(), cleaner_pid()}].

-type state() :: #{
cleaners := [cleaner_pid()],
waiting := waiting()
}.

%% @doc Spawn `dist_blocker'
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

%% @doc Register CleanerPid as a cleaner.
-spec add_cleaner(pid()) -> ok.
add_cleaner(CleanerPid) ->
gen_server:call(?MODULE, {add_cleaner, CleanerPid}).

%% @doc Confirm that cleaning is done.
%%
%% This function is called by a cleaner after it receives nodedown.
chrzaszcz marked this conversation as resolved.
Show resolved Hide resolved
-spec cleaning_done(pid(), node()) -> ok.
cleaning_done(CleanerPid, Node) ->
gen_server:call(?MODULE, {cleaning_done, CleanerPid, Node}).

%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
ok = net_kernel:monitor_nodes(true),
State = #{cleaners => [], waiting => []},
State2 = lists:foldl(fun handle_nodeup/2, State, nodes()),
{ok, State2}.

handle_call({add_cleaner, CleanerPid}, _From, State) ->
{reply, ok, handle_add_cleaner(CleanerPid, State)};
handle_call({cleaning_done, CleanerPid, Node}, _From, State) ->
{reply, ok, maybe_unblock(State, handle_cleaning_done(CleanerPid, Node, State))};
handle_call(Request, _From, State) ->
?LOG_ERROR(#{what => unexpected_call, msg => Request}),
{reply, {error, unexpected_call}, State}.

handle_cast(Msg, State) ->
?LOG_ERROR(#{what => unexpected_cast, msg => Msg}),
{noreply, State}.

handle_info({nodeup, Node}, State) ->
{noreply, handle_nodeup(Node, State)};
handle_info({nodedown, Node}, State) ->
{noreply, handle_nodedown(Node, State)};
handle_info({'DOWN', _Ref, process, Pid, _Info}, State) ->
{noreply, maybe_unblock(State, handle_cleaner_down(Pid, State))};
handle_info(Info, State) ->
?LOG_ERROR(#{what => unexpected_info, msg => Info}),
{noreply, State}.

terminate(_Reason, State) ->
%% Restore cookies
_ = maybe_unblock(State, State#{waiting := []}),
ok.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%--------------------------------------------------------------------
%% internal functions
%%--------------------------------------------------------------------

-spec handle_nodeup(node(), state()) -> state().
handle_nodeup(Node, State) ->
%% We change the cookie as soon as the node is connected.
%% Alternative is to do it on nodedown, but because nodedown-s are async,
%% we would have a high chance of race conditions (so, node could reconnect
%% before we set cookie).
erlang:set_cookie(Node, blocking_cookie()),
State.

%% Make cookie, that would prevent node from connecting
-spec blocking_cookie() -> atom().
blocking_cookie() ->
list_to_atom(atom_to_list(erlang:get_cookie()) ++ "_blocked_by_" ++ atom_to_list(node())).

%% Allow the node to connect to us again
-spec unblock_node(node(), state()) -> state().
unblock_node(Node, State) ->
erlang:set_cookie(Node, erlang:get_cookie()),
State.

-spec handle_nodedown(node(), state()) -> state().
handle_nodedown(Node, State = #{cleaners := []}) ->
%% Skip waiting when no cleaners
unblock_node(Node, State);
handle_nodedown(Node, State = #{cleaners := Cleaners, waiting := Waiting}) ->
New = [{Node, CleanerPid} || CleanerPid <- Cleaners],
State#{waiting := lists:usort(New ++ Waiting)}.

-spec handle_add_cleaner(cleaner_pid(), state()) -> state().
handle_add_cleaner(CleanerPid, State = #{cleaners := Cleaners}) ->
erlang:monitor(process, CleanerPid),
State#{cleaners := lists:usort([CleanerPid | Cleaners])}.

-spec handle_cleaning_done(cleaner_pid(), node(), state()) -> state().
handle_cleaning_done(CleanerPid, Node, State = #{waiting := Waiting}) ->
State#{waiting := lists:delete({Node, CleanerPid}, Waiting)}.

-spec handle_cleaner_down(cleaner_pid(), state()) -> state().
handle_cleaner_down(CleanerPid, State = #{cleaners := Cleaners, waiting := Waiting}) ->
State#{
cleaners := lists:delete(CleanerPid, Cleaners),
waiting := [X || {_Node, CleanerPid2} = X <- Waiting, CleanerPid =/= CleanerPid2]
}.

%% Unblock nodes when the last cleaner confirms the cleaning is done.
%% Call this function each time you remove entries from the waiting list.
-spec maybe_unblock(state(), state()) -> state().
maybe_unblock(_OldState = #{waiting := OldWaiting}, NewState = #{waiting := NewWaiting}) ->
OldNodes = cast_waiting_to_nodes(OldWaiting),
NewNodes = cast_waiting_to_nodes(NewWaiting),
CleanedNodes = OldNodes -- NewNodes,
lists:foldl(fun unblock_node/2, NewState, CleanedNodes).

-spec cast_waiting_to_nodes(waiting()) -> [node()].
cast_waiting_to_nodes(Waiting) ->
lists:usort([Node || {Node, _CleanerPid} <- Waiting]).
40 changes: 2 additions & 38 deletions test/cets_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ cets_seq_no_log_cases() ->
init_per_suite(Config) ->
init_cleanup_table(),
Names = [ct2, ct3, ct4, ct5, ct6, ct7],
{Nodes, Peers} = lists:unzip([start_node(N) || N <- Names]),
{Nodes, Peers} = lists:unzip([cets_test_peer:start_node(N) || N <- Names]),
[
{nodes, maps:from_list(lists:zip(Names, Nodes))},
{peers, maps:from_list(lists:zip(Names, Peers))}
Expand Down Expand Up @@ -2922,7 +2922,7 @@ schedule_cleanup(Pid) ->
{'DOWN', Ref, process, Me, _} ->
%% We do an RPC call, because erlang distribution
%% could not be always reliable (because we test netsplits)
rpc(node_to_peer(node(Pid)), cets, stop, [Pid]),
rpc(cets_test_peer:node_to_peer(node(Pid)), cets, stop, [Pid]),
ets:delete_object(cleanup_table, {Me, self()})
end
end),
Expand Down Expand Up @@ -3005,42 +3005,6 @@ rpc(Node, M, F, Args) when is_atom(Node) ->
Other
end.

%% Set epmd_port for better coverage
extra_args(ct2) ->
["-epmd_port", "4369"];
extra_args(X) when X == ct5; X == ct6; X == ct7 ->
["-kernel", "prevent_overlapping_partitions", "false"];
extra_args(_) ->
"".

start_node(Sname) ->
{ok, Peer, Node} = ?CT_PEER(#{
name => Sname, connection => standard_io, args => extra_args(Sname)
}),
%% Register so we can find Peer process later in code
register(node_to_peer_name(Node), Peer),
%% Keep nodes running after init_per_suite is finished
unlink(Peer),
%% Do RPC using alternative connection method
ok = peer:call(Peer, code, add_paths, [code:get_path()]),
{Node, Peer}.

%% Returns Peer or Node name which could be used to do RPC-s reliably
%% (regardless if Erlang Distribution works or not)
node_to_peer(Node) when Node =:= node() ->
%% There is no peer for the local CT node
Node;
node_to_peer(Node) when is_atom(Node) ->
case whereis(node_to_peer_name(Node)) of
Pid when is_pid(Pid) ->
Pid;
undefined ->
ct:fail({node_to_peer_failed, Node})
end.

node_to_peer_name(Node) ->
list_to_atom(atom_to_list(Node) ++ "_peer").

receive_message(M) ->
receive
M -> ok
Expand Down
Loading
Loading