From 8d24d279974b06f33e5e662fd41b273bd0a4e293 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Wed, 6 Mar 2024 10:18:05 +0100 Subject: [PATCH 1/9] Create cets_test_peer helper --- test/cets_SUITE.erl | 40 ++------------------------------------- test/cets_test_peer.erl | 42 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 38 deletions(-) create mode 100644 test/cets_test_peer.erl diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index ee4cef73..643a96d4 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -233,7 +233,7 @@ cets_seq_no_log_cases() -> init_per_suite(Config) -> init_cleanup_table(), Names = [ct2, ct3, ct4, ct5, ct6, ct7], - {Nodes, Peers} = lists:unzip([start_node(N) || N <- Names]), + {Nodes, Peers} = lists:unzip([cets_test_peer:start_node(N) || N <- Names]), [ {nodes, maps:from_list(lists:zip(Names, Nodes))}, {peers, maps:from_list(lists:zip(Names, Peers))} @@ -2922,7 +2922,7 @@ schedule_cleanup(Pid) -> {'DOWN', Ref, process, Me, _} -> %% We do an RPC call, because erlang distribution %% could not be always reliable (because we test netsplits) - rpc(node_to_peer(node(Pid)), cets, stop, [Pid]), + rpc(cets_test_peer:node_to_peer(node(Pid)), cets, stop, [Pid]), ets:delete_object(cleanup_table, {Me, self()}) end end), @@ -3005,42 +3005,6 @@ rpc(Node, M, F, Args) when is_atom(Node) -> Other end. -%% Set epmd_port for better coverage -extra_args(ct2) -> - ["-epmd_port", "4369"]; -extra_args(X) when X == ct5; X == ct6; X == ct7 -> - ["-kernel", "prevent_overlapping_partitions", "false"]; -extra_args(_) -> - "". - -start_node(Sname) -> - {ok, Peer, Node} = ?CT_PEER(#{ - name => Sname, connection => standard_io, args => extra_args(Sname) - }), - %% Register so we can find Peer process later in code - register(node_to_peer_name(Node), Peer), - %% Keep nodes running after init_per_suite is finished - unlink(Peer), - %% Do RPC using alternative connection method - ok = peer:call(Peer, code, add_paths, [code:get_path()]), - {Node, Peer}. - -%% Returns Peer or Node name which could be used to do RPC-s reliably -%% (regardless if Erlang Distribution works or not) -node_to_peer(Node) when Node =:= node() -> - %% There is no peer for the local CT node - Node; -node_to_peer(Node) when is_atom(Node) -> - case whereis(node_to_peer_name(Node)) of - Pid when is_pid(Pid) -> - Pid; - undefined -> - ct:fail({node_to_peer_failed, Node}) - end. - -node_to_peer_name(Node) -> - list_to_atom(atom_to_list(Node) ++ "_peer"). - receive_message(M) -> receive M -> ok diff --git a/test/cets_test_peer.erl b/test/cets_test_peer.erl new file mode 100644 index 00000000..81f86f09 --- /dev/null +++ b/test/cets_test_peer.erl @@ -0,0 +1,42 @@ +-module(cets_test_peer). +-export([ + start_node/1, + node_to_peer/1 +]). +-include_lib("common_test/include/ct.hrl"). + +start_node(Sname) -> + {ok, Peer, Node} = ?CT_PEER(#{ + name => Sname, connection => standard_io, args => extra_args(Sname) + }), + %% Register so we can find Peer process later in code + register(node_to_peer_name(Node), Peer), + %% Keep nodes running after init_per_suite is finished + unlink(Peer), + %% Do RPC using alternative connection method + ok = peer:call(Peer, code, add_paths, [code:get_path()]), + {Node, Peer}. + +%% Returns Peer or Node name which could be used to do RPC-s reliably +%% (regardless if Erlang Distribution works or not) +node_to_peer(Node) when Node =:= node() -> + %% There is no peer for the local CT node + Node; +node_to_peer(Node) when is_atom(Node) -> + case whereis(node_to_peer_name(Node)) of + Pid when is_pid(Pid) -> + Pid; + undefined -> + ct:fail({node_to_peer_failed, Node}) + end. + +node_to_peer_name(Node) -> + list_to_atom(atom_to_list(Node) ++ "_peer"). + +%% Set epmd_port for better coverage +extra_args(ct2) -> + ["-epmd_port", "4369"]; +extra_args(X) when X == ct5; X == ct6; X == ct7 -> + ["-kernel", "prevent_overlapping_partitions", "false"]; +extra_args(_) -> + "". From b009ba6538deabc70eb9ef8bfeb7737be1ef4857 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Wed, 6 Mar 2024 10:18:40 +0100 Subject: [PATCH 2/9] Add cets_dist_blocker It prevents nodes from reconnecting too fast --- src/cets_dist_blocker.erl | 153 +++++++++++++++++++++++++++++++ test/cets_dist_blocker_SUITE.erl | 78 ++++++++++++++++ 2 files changed, 231 insertions(+) create mode 100644 src/cets_dist_blocker.erl create mode 100644 test/cets_dist_blocker_SUITE.erl diff --git a/src/cets_dist_blocker.erl b/src/cets_dist_blocker.erl new file mode 100644 index 00000000..18fd4c4b --- /dev/null +++ b/src/cets_dist_blocker.erl @@ -0,0 +1,153 @@ +%% @doc Disallow distributed erlang connections until cleaning is done. +%% +%% This module prevents a node from reconnecting, until cleaning activity is +%% finished. It prevents race conditions. +%% +%% This module assume all nodes share the same cookie. +-module(cets_dist_blocker). +-behaviour(gen_server). +-include_lib("kernel/include/logger.hrl"). + +%% API +-export([ + start_link/0, + add_cleaner/1, + cleaning_done/2 +]). + +%% gen_server callbacks +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +-ignore_xref([ + start_link/0, + add_cleaner/1, + cleaning_done/2 +]). + +-type cleaner_pid() :: pid(). +-type waiting() :: [{node(), cleaner_pid()}]. + +-type state() :: #{ + cleaners := [cleaner_pid()], + waiting := waiting() +}. + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%% Register Pid as a cleaner. +-spec add_cleaner(pid()) -> ok. +add_cleaner(CleanerPid) -> + gen_server:call(?MODULE, {add_cleaner, CleanerPid}). + +%% Cleaner calls must call this function. +-spec cleaning_done(pid(), node()) -> ok. +cleaning_done(CleanerPid, Node) -> + gen_server:call(?MODULE, {cleaning_done, CleanerPid, Node}). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- +init([]) -> + ok = net_kernel:monitor_nodes(true), + State = #{cleaners => [], waiting => []}, + State2 = lists:foldl(fun handle_nodeup/2, State, nodes()), + {ok, State2}. + +handle_call({add_cleaner, CleanerPid}, _From, State) -> + {reply, ok, handle_add_cleaner(CleanerPid, State)}; +handle_call({cleaning_done, CleanerPid, Node}, _From, State) -> + {reply, ok, maybe_unblock(State, handle_cleaning_done(CleanerPid, Node, State))}; +handle_call(Request, _From, State) -> + ?LOG_ERROR(#{what => unexpected_call, msg => Request}), + {reply, ok, State}. + +handle_cast(Msg, State) -> + ?LOG_ERROR(#{what => unexpected_cast, msg => Msg}), + {noreply, State}. + +handle_info({nodeup, Node}, State) -> + {noreply, handle_nodeup(Node, State)}; +handle_info({nodedown, Node}, State) -> + {noreply, handle_nodedown(Node, State)}; +handle_info({'DOWN', _Ref, process, Pid, _Info}, State) -> + {noreply, maybe_unblock(State, handle_cleaner_down(Pid, State))}; +handle_info(Info, State) -> + ?LOG_ERROR(#{what => unexpected_info, msg => Info}), + {noreply, State}. + +terminate(_Reason, State) -> + %% Restore cookies + _ = maybe_unblock(State, State#{waiting := []}), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% internal functions +%%-------------------------------------------------------------------- + +-spec handle_nodeup(node(), state()) -> state(). +handle_nodeup(Node, State) -> + %% We change the cookie as soon as the node is connected. + %% Alternative is to do it on nodedown, but because nodedown-s are async, + %% we would have a high chance of race conditions (so, node could reconnect + %% before we set cookie). + erlang:set_cookie(Node, blocking_cookie()), + State. + +%% Make cookie, that would prevent node from connecting +-spec blocking_cookie() -> atom(). +blocking_cookie() -> + list_to_atom(atom_to_list(erlang:get_cookie()) ++ "_blocked_by_" ++ atom_to_list(node())). + +%% Allow the node to connect to us again +-spec unblock_node(node(), state()) -> state(). +unblock_node(Node, State) -> + erlang:set_cookie(Node, erlang:get_cookie()), + State. + +-spec handle_nodedown(node(), state()) -> state(). +handle_nodedown(Node, State = #{cleaners := []}) -> + %% Skip waiting when no cleaners + unblock_node(Node, State); +handle_nodedown(Node, State = #{cleaners := Cleaners, waiting := Waiting}) -> + New = [{Node, CleanerPid} || CleanerPid <- Cleaners], + State#{waiting := lists:usort(New ++ Waiting)}. + +-spec handle_add_cleaner(cleaner_pid(), state()) -> state(). +handle_add_cleaner(CleanerPid, State = #{cleaners := Cleaners}) -> + erlang:monitor(process, CleanerPid), + State#{cleaners := lists:usort([CleanerPid | Cleaners])}. + +-spec handle_cleaning_done(cleaner_pid(), node(), state()) -> state(). +handle_cleaning_done(CleanerPid, Node, State = #{waiting := Waiting}) -> + State#{waiting := lists:delete({Node, CleanerPid}, Waiting)}. + +-spec handle_cleaner_down(cleaner_pid(), state()) -> state(). +handle_cleaner_down(CleanerPid, State = #{cleaners := Cleaners, waiting := Waiting}) -> + State#{ + cleaners := lists:delete(CleanerPid, Cleaners), + waiting := [X || {_Node, CleanerPid2} = X <- Waiting, CleanerPid =/= CleanerPid2] + }. + +%% Unblock nodes when the last cleaner confirms the cleaning is done. +%% Call this function each time you remove entries from the waiting list. +-spec maybe_unblock(state(), state()) -> state(). +maybe_unblock(_OldState = #{waiting := OldWaiting}, NewState = #{waiting := NewWaiting}) -> + OldNodes = cast_waiting_to_nodes(OldWaiting), + NewNodes = cast_waiting_to_nodes(NewWaiting), + CleanedNodes = OldNodes -- NewNodes, + lists:foldl(fun unblock_node/2, NewState, CleanedNodes). + +-spec cast_waiting_to_nodes(waiting()) -> [node()]. +cast_waiting_to_nodes(Waiting) -> + lists:usort([Node || {Node, _CleanerPid} <- Waiting]). diff --git a/test/cets_dist_blocker_SUITE.erl b/test/cets_dist_blocker_SUITE.erl new file mode 100644 index 00000000..683186cc --- /dev/null +++ b/test/cets_dist_blocker_SUITE.erl @@ -0,0 +1,78 @@ +-module(cets_dist_blocker_SUITE). +-include_lib("common_test/include/ct.hrl"). +-include_lib("kernel/include/logger.hrl"). +-compile([export_all, nowarn_export_all]). + +all() -> + [{group, all}]. + +groups() -> + [{all, [sequence, {repeat_until_any_fail, 2}], all_cases()}]. + +all_cases() -> + [ + dist_blocker_waits_for_cleaning, + dist_blocker_unblocks_if_cleaner_goes_down + ]. + +init_per_suite(Config) -> + Names = [peer_ct2], + {Nodes, Peers} = lists:unzip([cets_test_peer:start_node(N) || N <- Names]), + [ + {nodes, maps:from_list(lists:zip(Names, Nodes))}, + {peers, maps:from_list(lists:zip(Names, Peers))} + | Config + ]. + +end_per_suite(Config) -> + Config. + +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, Config) -> + Config. + +init_per_testcase(Name, Config) -> + init_per_testcase_generic(Name, Config). + +init_per_testcase_generic(Name, Config) -> + [{testcase, Name} | Config]. + +end_per_testcase(_, _Config) -> + ok. + +dist_blocker_waits_for_cleaning(Config) -> + #{peer_ct2 := Node2} = proplists:get_value(nodes, Config), + {ok, Blocker} = cets_dist_blocker:start_link(), + cets_dist_blocker:add_cleaner(self()), + pong = net_adm:ping(Node2), + true = erlang:disconnect_node(Node2), + %% Connection is blocked + pang = net_adm:ping(Node2), + cets_dist_blocker:cleaning_done(self(), Node2), + %% Connection is unblocked + pong = net_adm:ping(Node2), + gen_server:stop(Blocker). + +dist_blocker_unblocks_if_cleaner_goes_down(Config) -> + Me = self(), + #{peer_ct2 := Node2} = proplists:get_value(nodes, Config), + {ok, Blocker} = cets_dist_blocker:start_link(), + Cleaner = proc_lib:spawn(fun() -> + cets_dist_blocker:add_cleaner(self()), + Me ! added, + timer:sleep(infinity) + end), + receive + added -> ok + after 5000 -> ct:fail(timeout) + end, + pong = net_adm:ping(Node2), + true = erlang:disconnect_node(Node2), + %% Connection is blocked + pang = net_adm:ping(Node2), + erlang:exit(Cleaner, killed), + %% Connection is unblocked + pong = net_adm:ping(Node2), + gen_server:stop(Blocker). From 853c4eead37bd54086ef54789f06aa55af89a6c2 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Wed, 6 Mar 2024 10:25:11 +0100 Subject: [PATCH 3/9] Add tests for unknown messages in cets_dist_blocker --- test/cets_dist_blocker_SUITE.erl | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/test/cets_dist_blocker_SUITE.erl b/test/cets_dist_blocker_SUITE.erl index 683186cc..da891206 100644 --- a/test/cets_dist_blocker_SUITE.erl +++ b/test/cets_dist_blocker_SUITE.erl @@ -12,7 +12,11 @@ groups() -> all_cases() -> [ dist_blocker_waits_for_cleaning, - dist_blocker_unblocks_if_cleaner_goes_down + dist_blocker_unblocks_if_cleaner_goes_down, + unknown_down_message_is_ignored, + unknown_message_is_ignored, + unknown_cast_message_is_ignored, + code_change_returns_ok ]. init_per_suite(Config) -> @@ -76,3 +80,29 @@ dist_blocker_unblocks_if_cleaner_goes_down(Config) -> %% Connection is unblocked pong = net_adm:ping(Node2), gen_server:stop(Blocker). + +unknown_down_message_is_ignored(_Config) -> + {ok, Pid} = cets_dist_blocker:start_link(), + RandPid = proc_lib:spawn(fun() -> ok end), + Pid ! {'DOWN', make_ref(), process, RandPid, oops}, + still_works(Pid). + +unknown_message_is_ignored(_Config) -> + {ok, Pid} = cets_dist_blocker:start_link(), + Pid ! oops, + still_works(Pid). + +unknown_cast_message_is_ignored(_Config) -> + {ok, Pid} = cets_dist_blocker:start_link(), + gen_server:cast(Pid, oops), + still_works(Pid). + +code_change_returns_ok(_Config) -> + {ok, Pid} = cets_dist_blocker:start_link(), + sys:suspend(Pid), + ok = sys:change_code(Pid, cets_dist_blocker, v2, []), + sys:resume(Pid), + still_works(Pid). + +still_works(Pid) -> + #{} = sys:get_state(Pid). From 21c5777e2175d011822b5535bfe81e18832465e8 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Wed, 6 Mar 2024 11:01:54 +0100 Subject: [PATCH 4/9] Add dist_blocker_unblocks_if_cleaner_goes_down_and_second_cleaner_says_done Add dist_blocker_skip_blocking_if_no_cleaners testcase --- src/cets_dist_blocker.erl | 2 +- test/cets_dist_blocker_SUITE.erl | 56 ++++++++++++++++++++++++++------ 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/src/cets_dist_blocker.erl b/src/cets_dist_blocker.erl index 18fd4c4b..c2c8fc06 100644 --- a/src/cets_dist_blocker.erl +++ b/src/cets_dist_blocker.erl @@ -67,7 +67,7 @@ handle_call({cleaning_done, CleanerPid, Node}, _From, State) -> {reply, ok, maybe_unblock(State, handle_cleaning_done(CleanerPid, Node, State))}; handle_call(Request, _From, State) -> ?LOG_ERROR(#{what => unexpected_call, msg => Request}), - {reply, ok, State}. + {reply, {error, unexpected_call}, State}. handle_cast(Msg, State) -> ?LOG_ERROR(#{what => unexpected_cast, msg => Msg}), diff --git a/test/cets_dist_blocker_SUITE.erl b/test/cets_dist_blocker_SUITE.erl index da891206..ceb1f230 100644 --- a/test/cets_dist_blocker_SUITE.erl +++ b/test/cets_dist_blocker_SUITE.erl @@ -13,9 +13,12 @@ all_cases() -> [ dist_blocker_waits_for_cleaning, dist_blocker_unblocks_if_cleaner_goes_down, + dist_blocker_unblocks_if_cleaner_goes_down_and_second_cleaner_says_done, + dist_blocker_skip_blocking_if_no_cleaners, unknown_down_message_is_ignored, unknown_message_is_ignored, unknown_cast_message_is_ignored, + unknown_call_returns_error, code_change_returns_ok ]. @@ -60,27 +63,42 @@ dist_blocker_waits_for_cleaning(Config) -> gen_server:stop(Blocker). dist_blocker_unblocks_if_cleaner_goes_down(Config) -> - Me = self(), #{peer_ct2 := Node2} = proplists:get_value(nodes, Config), {ok, Blocker} = cets_dist_blocker:start_link(), - Cleaner = proc_lib:spawn(fun() -> - cets_dist_blocker:add_cleaner(self()), - Me ! added, - timer:sleep(infinity) - end), - receive - added -> ok - after 5000 -> ct:fail(timeout) - end, + Cleaner = spawn_cleaner(), + pong = net_adm:ping(Node2), + true = erlang:disconnect_node(Node2), + %% Connection is blocked + pang = net_adm:ping(Node2), + erlang:exit(Cleaner, killed), + %% Connection is unblocked + pong = net_adm:ping(Node2), + gen_server:stop(Blocker). + +dist_blocker_unblocks_if_cleaner_goes_down_and_second_cleaner_says_done(Config) -> + #{peer_ct2 := Node2} = proplists:get_value(nodes, Config), + {ok, Blocker} = cets_dist_blocker:start_link(), + %% Two cleaners + cets_dist_blocker:add_cleaner(self()), + Cleaner = spawn_cleaner(), pong = net_adm:ping(Node2), true = erlang:disconnect_node(Node2), %% Connection is blocked pang = net_adm:ping(Node2), erlang:exit(Cleaner, killed), + cets_dist_blocker:cleaning_done(self(), Node2), %% Connection is unblocked pong = net_adm:ping(Node2), gen_server:stop(Blocker). +dist_blocker_skip_blocking_if_no_cleaners(Config) -> + #{peer_ct2 := Node2} = proplists:get_value(nodes, Config), + {ok, Blocker} = cets_dist_blocker:start_link(), + pong = net_adm:ping(Node2), + true = erlang:disconnect_node(Node2), + pong = net_adm:ping(Node2), + gen_server:stop(Blocker). + unknown_down_message_is_ignored(_Config) -> {ok, Pid} = cets_dist_blocker:start_link(), RandPid = proc_lib:spawn(fun() -> ok end), @@ -97,6 +115,11 @@ unknown_cast_message_is_ignored(_Config) -> gen_server:cast(Pid, oops), still_works(Pid). +unknown_call_returns_error(_Config) -> + {ok, Pid} = cets_dist_blocker:start_link(), + {error, unexpected_call} = gen_server:call(Pid, oops), + still_works(Pid). + code_change_returns_ok(_Config) -> {ok, Pid} = cets_dist_blocker:start_link(), sys:suspend(Pid), @@ -106,3 +129,16 @@ code_change_returns_ok(_Config) -> still_works(Pid) -> #{} = sys:get_state(Pid). + +spawn_cleaner() -> + Me = self(), + Cleaner = proc_lib:spawn(fun() -> + cets_dist_blocker:add_cleaner(self()), + Me ! added, + timer:sleep(infinity) + end), + receive + added -> ok + after 5000 -> ct:fail(timeout) + end, + Cleaner. From 788931ffd8004309dce4f8d2f12628c5d5847f75 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Wed, 6 Mar 2024 11:06:01 +0100 Subject: [PATCH 5/9] Improve docs --- src/cets_dist_blocker.erl | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/cets_dist_blocker.erl b/src/cets_dist_blocker.erl index c2c8fc06..7be2e5c1 100644 --- a/src/cets_dist_blocker.erl +++ b/src/cets_dist_blocker.erl @@ -39,15 +39,18 @@ waiting := waiting() }. +%% @doc Spawn `dist_blocker' start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -%% Register Pid as a cleaner. +%% @doc Register CleanerPid as a cleaner. -spec add_cleaner(pid()) -> ok. add_cleaner(CleanerPid) -> gen_server:call(?MODULE, {add_cleaner, CleanerPid}). -%% Cleaner calls must call this function. +%% @doc Confirm that cleaning is done. +%% +%% This function is called by a cleaner after it receives nodedown. -spec cleaning_done(pid(), node()) -> ok. cleaning_done(CleanerPid, Node) -> gen_server:call(?MODULE, {cleaning_done, CleanerPid, Node}). From dfa6add130e09dbafe40e718d75b629701686e6e Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Wed, 6 Mar 2024 11:55:27 +0100 Subject: [PATCH 6/9] Add dist_blocker_unblocks_if_cleaner_says_done_and_second_cleaner_goes_down --- test/cets_dist_blocker_SUITE.erl | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/cets_dist_blocker_SUITE.erl b/test/cets_dist_blocker_SUITE.erl index ceb1f230..3765d161 100644 --- a/test/cets_dist_blocker_SUITE.erl +++ b/test/cets_dist_blocker_SUITE.erl @@ -14,6 +14,7 @@ all_cases() -> dist_blocker_waits_for_cleaning, dist_blocker_unblocks_if_cleaner_goes_down, dist_blocker_unblocks_if_cleaner_goes_down_and_second_cleaner_says_done, + dist_blocker_unblocks_if_cleaner_says_done_and_second_cleaner_goes_down, dist_blocker_skip_blocking_if_no_cleaners, unknown_down_message_is_ignored, unknown_message_is_ignored, @@ -91,6 +92,23 @@ dist_blocker_unblocks_if_cleaner_goes_down_and_second_cleaner_says_done(Config) pong = net_adm:ping(Node2), gen_server:stop(Blocker). +dist_blocker_unblocks_if_cleaner_says_done_and_second_cleaner_goes_down(Config) -> + #{peer_ct2 := Node2} = proplists:get_value(nodes, Config), + {ok, Blocker} = cets_dist_blocker:start_link(), + %% Two cleaners + cets_dist_blocker:add_cleaner(self()), + Cleaner = spawn_cleaner(), + pong = net_adm:ping(Node2), + true = erlang:disconnect_node(Node2), + %% Connection is blocked + pang = net_adm:ping(Node2), + %% Different order comparing to dist_blocker_unblocks_if_cleaner_goes_down_and_second_cleaner_says_done + cets_dist_blocker:cleaning_done(self(), Node2), + erlang:exit(Cleaner, killed), + %% Connection is unblocked + pong = net_adm:ping(Node2), + gen_server:stop(Blocker). + dist_blocker_skip_blocking_if_no_cleaners(Config) -> #{peer_ct2 := Node2} = proplists:get_value(nodes, Config), {ok, Blocker} = cets_dist_blocker:start_link(), From b37ef11470b80708f70c166db5f5a507d5b8f021 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Wed, 6 Mar 2024 12:31:50 +0100 Subject: [PATCH 7/9] More strict order control in tests for dist_blocker dist_blocker_blocks_if_cleaner_says_done_and_second_cleaner_does_not_ack testcase Common connect_and_disconnect function --- test/cets_dist_blocker_SUITE.erl | 57 +++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 16 deletions(-) diff --git a/test/cets_dist_blocker_SUITE.erl b/test/cets_dist_blocker_SUITE.erl index 3765d161..62af5dfa 100644 --- a/test/cets_dist_blocker_SUITE.erl +++ b/test/cets_dist_blocker_SUITE.erl @@ -15,6 +15,7 @@ all_cases() -> dist_blocker_unblocks_if_cleaner_goes_down, dist_blocker_unblocks_if_cleaner_goes_down_and_second_cleaner_says_done, dist_blocker_unblocks_if_cleaner_says_done_and_second_cleaner_goes_down, + dist_blocker_blocks_if_cleaner_says_done_and_second_cleaner_does_not_ack, dist_blocker_skip_blocking_if_no_cleaners, unknown_down_message_is_ignored, unknown_message_is_ignored, @@ -54,11 +55,9 @@ dist_blocker_waits_for_cleaning(Config) -> #{peer_ct2 := Node2} = proplists:get_value(nodes, Config), {ok, Blocker} = cets_dist_blocker:start_link(), cets_dist_blocker:add_cleaner(self()), - pong = net_adm:ping(Node2), - true = erlang:disconnect_node(Node2), - %% Connection is blocked - pang = net_adm:ping(Node2), + connect_and_disconnect(Node2), cets_dist_blocker:cleaning_done(self(), Node2), + sync_blocker(Blocker), %% Connection is unblocked pong = net_adm:ping(Node2), gen_server:stop(Blocker). @@ -67,11 +66,9 @@ dist_blocker_unblocks_if_cleaner_goes_down(Config) -> #{peer_ct2 := Node2} = proplists:get_value(nodes, Config), {ok, Blocker} = cets_dist_blocker:start_link(), Cleaner = spawn_cleaner(), - pong = net_adm:ping(Node2), - true = erlang:disconnect_node(Node2), - %% Connection is blocked - pang = net_adm:ping(Node2), + connect_and_disconnect(Node2), erlang:exit(Cleaner, killed), + sync_blocker(Blocker), %% Connection is unblocked pong = net_adm:ping(Node2), gen_server:stop(Blocker). @@ -82,12 +79,11 @@ dist_blocker_unblocks_if_cleaner_goes_down_and_second_cleaner_says_done(Config) %% Two cleaners cets_dist_blocker:add_cleaner(self()), Cleaner = spawn_cleaner(), - pong = net_adm:ping(Node2), - true = erlang:disconnect_node(Node2), - %% Connection is blocked - pang = net_adm:ping(Node2), + connect_and_disconnect(Node2), erlang:exit(Cleaner, killed), + wait_for_waiting_count(Blocker, 1), cets_dist_blocker:cleaning_done(self(), Node2), + sync_blocker(Blocker), %% Connection is unblocked pong = net_adm:ping(Node2), gen_server:stop(Blocker). @@ -98,17 +94,30 @@ dist_blocker_unblocks_if_cleaner_says_done_and_second_cleaner_goes_down(Config) %% Two cleaners cets_dist_blocker:add_cleaner(self()), Cleaner = spawn_cleaner(), - pong = net_adm:ping(Node2), - true = erlang:disconnect_node(Node2), - %% Connection is blocked - pang = net_adm:ping(Node2), + connect_and_disconnect(Node2), %% Different order comparing to dist_blocker_unblocks_if_cleaner_goes_down_and_second_cleaner_says_done cets_dist_blocker:cleaning_done(self(), Node2), + wait_for_waiting_count(Blocker, 1), erlang:exit(Cleaner, killed), + sync_blocker(Blocker), %% Connection is unblocked pong = net_adm:ping(Node2), gen_server:stop(Blocker). +dist_blocker_blocks_if_cleaner_says_done_and_second_cleaner_does_not_ack(Config) -> + #{peer_ct2 := Node2} = proplists:get_value(nodes, Config), + {ok, Blocker} = cets_dist_blocker:start_link(), + %% Two cleaners + cets_dist_blocker:add_cleaner(self()), + Cleaner = spawn_cleaner(), + connect_and_disconnect(Node2), + cets_dist_blocker:cleaning_done(self(), Node2), + wait_for_waiting_count(Blocker, 1), + sync_blocker(Blocker), + %% Connection is still blocked + pang = net_adm:ping(Node2), + gen_server:stop(Blocker). + dist_blocker_skip_blocking_if_no_cleaners(Config) -> #{peer_ct2 := Node2} = proplists:get_value(nodes, Config), {ok, Blocker} = cets_dist_blocker:start_link(), @@ -160,3 +169,19 @@ spawn_cleaner() -> after 5000 -> ct:fail(timeout) end, Cleaner. + +%% Wait for the blocker to process pending nodeup/nodedown messages +sync_blocker(Blocker) -> + sys:get_state(Blocker), + ok. + +wait_for_waiting_count(Blocker, Count) -> + F = fun() -> length(maps:get(waiting, sys:get_state(Blocker))) end, + cets_test_wait:wait_until(F, Count). + +connect_and_disconnect(Node2) -> + pong = net_adm:ping(Node2), + true = erlang:disconnect_node(Node2), + %% Connection is blocked + pang = net_adm:ping(Node2), + ok. From 43f5298edbcf4165f5fb5b064b14e2d4e58e0552 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Wed, 6 Mar 2024 12:55:11 +0100 Subject: [PATCH 8/9] Use two groups of tests in cets_dist_blocker_SUITE --- test/cets_dist_blocker_SUITE.erl | 50 +++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 17 deletions(-) diff --git a/test/cets_dist_blocker_SUITE.erl b/test/cets_dist_blocker_SUITE.erl index 62af5dfa..5353cbb5 100644 --- a/test/cets_dist_blocker_SUITE.erl +++ b/test/cets_dist_blocker_SUITE.erl @@ -4,19 +4,29 @@ -compile([export_all, nowarn_export_all]). all() -> - [{group, all}]. + [ + {group, blocker}, + {group, unknown} + ]. groups() -> - [{all, [sequence, {repeat_until_any_fail, 2}], all_cases()}]. + [ + {blocker, [sequence, {repeat_until_any_fail, 2}], blocker_cases()}, + {unknown, [sequence, {repeat_until_any_fail, 2}], unknown_cases()} + ]. -all_cases() -> +blocker_cases() -> + [ + waits_for_cleaning, + unblocks_if_cleaner_goes_down, + unblocks_if_cleaner_goes_down_and_second_cleaner_says_done, + unblocks_if_cleaner_says_done_and_second_cleaner_goes_down, + blocks_if_cleaner_says_done_and_second_cleaner_does_not_ack, + skip_blocking_if_no_cleaners + ]. + +unknown_cases() -> [ - dist_blocker_waits_for_cleaning, - dist_blocker_unblocks_if_cleaner_goes_down, - dist_blocker_unblocks_if_cleaner_goes_down_and_second_cleaner_says_done, - dist_blocker_unblocks_if_cleaner_says_done_and_second_cleaner_goes_down, - dist_blocker_blocks_if_cleaner_says_done_and_second_cleaner_does_not_ack, - dist_blocker_skip_blocking_if_no_cleaners, unknown_down_message_is_ignored, unknown_message_is_ignored, unknown_cast_message_is_ignored, @@ -51,7 +61,9 @@ init_per_testcase_generic(Name, Config) -> end_per_testcase(_, _Config) -> ok. -dist_blocker_waits_for_cleaning(Config) -> +%% Test blocking functionality + +waits_for_cleaning(Config) -> #{peer_ct2 := Node2} = proplists:get_value(nodes, Config), {ok, Blocker} = cets_dist_blocker:start_link(), cets_dist_blocker:add_cleaner(self()), @@ -62,7 +74,7 @@ dist_blocker_waits_for_cleaning(Config) -> pong = net_adm:ping(Node2), gen_server:stop(Blocker). -dist_blocker_unblocks_if_cleaner_goes_down(Config) -> +unblocks_if_cleaner_goes_down(Config) -> #{peer_ct2 := Node2} = proplists:get_value(nodes, Config), {ok, Blocker} = cets_dist_blocker:start_link(), Cleaner = spawn_cleaner(), @@ -73,7 +85,7 @@ dist_blocker_unblocks_if_cleaner_goes_down(Config) -> pong = net_adm:ping(Node2), gen_server:stop(Blocker). -dist_blocker_unblocks_if_cleaner_goes_down_and_second_cleaner_says_done(Config) -> +unblocks_if_cleaner_goes_down_and_second_cleaner_says_done(Config) -> #{peer_ct2 := Node2} = proplists:get_value(nodes, Config), {ok, Blocker} = cets_dist_blocker:start_link(), %% Two cleaners @@ -88,14 +100,14 @@ dist_blocker_unblocks_if_cleaner_goes_down_and_second_cleaner_says_done(Config) pong = net_adm:ping(Node2), gen_server:stop(Blocker). -dist_blocker_unblocks_if_cleaner_says_done_and_second_cleaner_goes_down(Config) -> +unblocks_if_cleaner_says_done_and_second_cleaner_goes_down(Config) -> #{peer_ct2 := Node2} = proplists:get_value(nodes, Config), {ok, Blocker} = cets_dist_blocker:start_link(), %% Two cleaners cets_dist_blocker:add_cleaner(self()), Cleaner = spawn_cleaner(), connect_and_disconnect(Node2), - %% Different order comparing to dist_blocker_unblocks_if_cleaner_goes_down_and_second_cleaner_says_done + %% Different order comparing to unblocks_if_cleaner_goes_down_and_second_cleaner_says_done cets_dist_blocker:cleaning_done(self(), Node2), wait_for_waiting_count(Blocker, 1), erlang:exit(Cleaner, killed), @@ -104,12 +116,12 @@ dist_blocker_unblocks_if_cleaner_says_done_and_second_cleaner_goes_down(Config) pong = net_adm:ping(Node2), gen_server:stop(Blocker). -dist_blocker_blocks_if_cleaner_says_done_and_second_cleaner_does_not_ack(Config) -> +blocks_if_cleaner_says_done_and_second_cleaner_does_not_ack(Config) -> #{peer_ct2 := Node2} = proplists:get_value(nodes, Config), {ok, Blocker} = cets_dist_blocker:start_link(), %% Two cleaners cets_dist_blocker:add_cleaner(self()), - Cleaner = spawn_cleaner(), + _Cleaner = spawn_cleaner(), connect_and_disconnect(Node2), cets_dist_blocker:cleaning_done(self(), Node2), wait_for_waiting_count(Blocker, 1), @@ -118,7 +130,7 @@ dist_blocker_blocks_if_cleaner_says_done_and_second_cleaner_does_not_ack(Config) pang = net_adm:ping(Node2), gen_server:stop(Blocker). -dist_blocker_skip_blocking_if_no_cleaners(Config) -> +skip_blocking_if_no_cleaners(Config) -> #{peer_ct2 := Node2} = proplists:get_value(nodes, Config), {ok, Blocker} = cets_dist_blocker:start_link(), pong = net_adm:ping(Node2), @@ -126,6 +138,8 @@ dist_blocker_skip_blocking_if_no_cleaners(Config) -> pong = net_adm:ping(Node2), gen_server:stop(Blocker). +%% Cover unknown message handling / code_change + unknown_down_message_is_ignored(_Config) -> {ok, Pid} = cets_dist_blocker:start_link(), RandPid = proc_lib:spawn(fun() -> ok end), @@ -154,6 +168,8 @@ code_change_returns_ok(_Config) -> sys:resume(Pid), still_works(Pid). +%% Helpers + still_works(Pid) -> #{} = sys:get_state(Pid). From b14d8b85b659336dc316c3fd30e71cbbdbce6a63 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 8 Mar 2024 16:51:00 +0100 Subject: [PATCH 9/9] Fix comments after the review --- src/cets_dist_blocker.erl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/cets_dist_blocker.erl b/src/cets_dist_blocker.erl index 7be2e5c1..392234a2 100644 --- a/src/cets_dist_blocker.erl +++ b/src/cets_dist_blocker.erl @@ -3,7 +3,7 @@ %% This module prevents a node from reconnecting, until cleaning activity is %% finished. It prevents race conditions. %% -%% This module assume all nodes share the same cookie. +%% This module assumes all nodes share the same cookie. -module(cets_dist_blocker). -behaviour(gen_server). -include_lib("kernel/include/logger.hrl"). @@ -50,7 +50,8 @@ add_cleaner(CleanerPid) -> %% @doc Confirm that cleaning is done. %% -%% This function is called by a cleaner after it receives nodedown. +%% This function should be called by a cleaner when it receives +%% nodedown and finishes cleaning. -spec cleaning_done(pid(), node()) -> ok. cleaning_done(CleanerPid, Node) -> gen_server:call(?MODULE, {cleaning_done, CleanerPid, Node}).