diff --git a/src/ra.hrl b/src/ra.hrl
index cef26c04..5f70d38f 100644
--- a/src/ra.hrl
+++ b/src/ra.hrl
@@ -70,7 +70,8 @@
voter_status => ra_voter_status(),
%% indicates that a snapshot is being sent
%% to the peer
- status := ra_peer_status()}.
+ status := ra_peer_status(),
+ machine_version => ra_machine:version()}.
-type ra_cluster() :: #{ra_server_id() => ra_peer_state()}.
@@ -187,6 +188,17 @@
{query_index :: integer(),
term :: ra_term()}).
+-record(info_rpc,
+ {from :: ra_server_id(),
+ term :: ra_term(),
+ keys :: [ra_server:ra_server_info_key()]}).
+
+-record(info_reply,
+ {from :: ra_server_id(),
+ term :: ra_term(),
+ keys :: [ra_server:ra_server_info_key()],
+ info = #{} :: ra_server:ra_server_info()}).
+
%% WAL defaults
-define(WAL_DEFAULT_MAX_SIZE_BYTES, 256 * 1000 * 1000).
-define(WAL_DEFAULT_MAX_BATCH_SIZE, 8192).
diff --git a/src/ra_server.erl b/src/ra_server.erl
index bffa31cd..36fa5720 100644
--- a/src/ra_server.erl
+++ b/src/ra_server.erl
@@ -146,6 +146,8 @@
ra_log:event() |
{consistent_query, term(), ra:query_fun()} |
#heartbeat_rpc{} |
+ #info_rpc{} |
+ #info_reply{} |
{ra_server_id, #heartbeat_reply{}} |
pipeline_rpcs.
@@ -192,6 +194,18 @@
%% The simple machine config is version that can only be used for simple state
%% machines that cannot access any of the advanced features.
+-type machine_upgrade_strategy() :: all | quorum.
+%% When a new Ra machine version should be upgraded to.
+%%
+%%
+%% - `all': a new machine version it used only when all members of the
+%% cluster support it. This is the default behavior starting with Ra
+%% 2.16.
+%% - `quorum': when a leader is elected and if it supports a newer machine
+%% version, it switches to that new version. This was the default and only
+%% possible behavior in Ra up to 2.15.
+%%
+
-type ra_server_config() :: #{id := ra_server_id(),
uid := ra_uid(),
%% a friendly name to refer to a particular
@@ -219,6 +233,27 @@
has_changed => boolean()
}.
+-type ra_server_info_key() :: machine_version | atom().
+%% Key one can get in `ra_server_info()'.
+%%
+%% This is used in the `#info_rpc{}' to ask for a specific list of info keys.
+%%
+%% Key meanings:
+%%
+%% - `machine_version': Highest machine version supported by this Ra
+%% server.
+%%
+%%
+%% Any atom is supported because a future version of Ra could ask for
+%% something this version does not know about.
+
+-type ra_server_info() :: #{machine_version => ra_machine:version(),
+ atom() => any()}.
+%% Info for a Ra server, got from `#info_reply{}'.
+%%
+%% In addition, the map may contain keys unknown to this version of Ra but
+%% emitted by a future version.
+
-type mutable_config() :: #{cluster_name => ra_cluster_name(),
metrics_key => term(),
broadcast_time => non_neg_integer(), % ms
@@ -235,6 +270,8 @@
ra_server_state/0,
ra_state/0,
ra_server_config/0,
+ ra_server_info_key/0,
+ ra_server_info/0,
mutable_config/0,
ra_msg/0,
machine_conf/0,
@@ -246,7 +283,8 @@
command_reply_mode/0,
ra_event_formatter_fun/0,
effect/0,
- effects/0
+ effects/0,
+ machine_upgrade_strategy/0
]).
-spec name(ClusterName :: ra_cluster_name(), UniqueSuffix::string()) -> atom().
@@ -814,6 +852,29 @@ handle_leader(#request_vote_result{}, State) ->
handle_leader(#pre_vote_result{}, State) ->
%% handle to avoid logging as unhandled
{leader, State, []};
+handle_leader(#info_rpc{term = Term} = Msg,
+ #{current_term := CurTerm,
+ cfg := #cfg{log_id = LogId}} = State0)
+ when CurTerm < Term ->
+ ?INFO("~ts: leader saw info_rpc from ~w for term ~b, abdicates term: ~b!",
+ [LogId, Msg#info_rpc.from, Term, CurTerm]),
+ {follower, update_term(Term, State0#{leader_id => undefined}),
+ [{next_event, Msg}]};
+handle_leader(#info_rpc{} = InfoRpc, State) ->
+ InfoReplyEffect = info_reply_effect(State, InfoRpc),
+ {leader, State, [InfoReplyEffect]};
+handle_leader(#info_reply{term = Term} = Msg,
+ #{current_term := CurTerm,
+ cfg := #cfg{log_id = LogId}} = State0)
+ when CurTerm < Term ->
+ ?INFO("~ts: leader saw info_reply from ~w for term ~b, abdicates "
+ "term: ~b!",
+ [LogId, Msg#info_reply.from, Term, CurTerm]),
+ {follower, update_term(Term, State0#{leader_id => undefined}),
+ [{next_event, Msg}]};
+handle_leader(#info_reply{} = InfoReply, State) ->
+ {State1, Effects} = handle_info_reply(State, InfoReply),
+ {leader, State1, Effects};
handle_leader({transfer_leadership, Leader},
#{cfg := #cfg{id = Leader, log_id = LogId}} = State) ->
?DEBUG("~ts: transfer leadership requested but already leader",
@@ -860,8 +921,7 @@ handle_leader(Msg, State) ->
{ra_state(), ra_server_state(), effects()}.
handle_candidate(#request_vote_result{term = Term, vote_granted = true},
#{cfg := #cfg{id = Id,
- log_id = LogId,
- machine = Mac},
+ log_id = LogId},
current_term := Term,
votes := Votes,
cluster := Nodes} = State0) ->
@@ -871,11 +931,10 @@ handle_candidate(#request_vote_result{term = Term, vote_granted = true},
case required_quorum(Nodes) of
NewVotes ->
{State1, Effects} = make_all_rpcs(initialise_peers(State0)),
- Noop = {noop, #{ts => erlang:system_time(millisecond)},
- ra_machine:version(Mac)},
State = State1#{leader_id => Id},
+ PostElectionEffects = post_election_effects(State),
{leader, maps:without([votes], State),
- [{next_event, cast, {command, Noop}} | Effects]};
+ PostElectionEffects ++ Effects};
_ ->
{candidate, State0#{votes => NewVotes}, []}
end;
@@ -966,6 +1025,27 @@ handle_candidate({register_external_log_reader, Pid}, #{log := Log0} = State) ->
{candidate, State#{log => Log}, Effs};
handle_candidate(force_member_change, State0) ->
{follower, State0#{votes => 0}, [{next_event, force_member_change}]};
+handle_candidate(#info_rpc{term = Term} = Msg,
+ #{current_term := CurTerm,
+ cfg := #cfg{log_id = LogId}} = State0)
+ when CurTerm < Term ->
+ ?INFO("~ts: candidate info_rpc with higher term received ~b -> ~b",
+ [LogId, CurTerm, Term]),
+ State = update_term_and_voted_for(Term, undefined, State0),
+ {follower, State, [{next_event, Msg}]};
+handle_candidate(#info_rpc{} = InfoRpc, State) ->
+ InfoReplyEffect = empty_info_reply_effect(State, InfoRpc),
+ {candidate, State, [InfoReplyEffect]};
+handle_candidate(#info_reply{term = Term} = Msg,
+ #{current_term := CurTerm,
+ cfg := #cfg{log_id = LogId}} = State0)
+ when CurTerm < Term ->
+ ?INFO("~ts: candidate info_reply with higher term received ~b -> ~b",
+ [LogId, CurTerm, Term]),
+ State = update_term_and_voted_for(Term, undefined, State0),
+ {follower, State, [{next_event, Msg}]};
+handle_candidate(#info_reply{}, State) ->
+ {candidate, State, []};
handle_candidate(Msg, State) ->
log_unhandled_msg(candidate, Msg, State),
{candidate, State, [{reply, {error, {unsupported_call, Msg}}}]}.
@@ -1048,6 +1128,19 @@ handle_pre_vote({register_external_log_reader, Pid}, #{log := Log0} = State) ->
{pre_vote, State#{log => Log}, Effs};
handle_pre_vote(force_member_change, State0) ->
{follower, State0#{votes => 0}, [{next_event, force_member_change}]};
+handle_pre_vote(#info_rpc{term = Term} = Msg,
+ #{current_term := CurTerm} = State0)
+ when CurTerm < Term ->
+ {follower, State0#{votes => 0}, [{next_event, Msg}]};
+handle_pre_vote(#info_rpc{} = InfoRpc, State) ->
+ InfoReplyEffect = empty_info_reply_effect(State, InfoRpc),
+ {pre_vote, State, [InfoReplyEffect]};
+handle_pre_vote(#info_reply{term = Term} = Msg,
+ #{current_term := CurTerm} = State0)
+ when CurTerm < Term ->
+ {follower, State0#{votes => 0}, [{next_event, Msg}]};
+handle_pre_vote(#info_reply{}, State) ->
+ {pre_vote, State, []};
handle_pre_vote(Msg, State) ->
log_unhandled_msg(pre_vote, Msg, State),
{pre_vote, State, [{reply, {error, {unsupported_call, Msg}}}]}.
@@ -1349,6 +1442,21 @@ handle_follower(force_member_change,
{ok, _, _, State, Effects} =
append_cluster_change(Cluster, undefined, no_reply, State0, []),
call_for_election(pre_vote, State, [{reply, ok} | Effects]);
+handle_follower(#info_rpc{term = Term} = Msg,
+ #{current_term := CurTerm} = State)
+ when CurTerm < Term ->
+ State1 = update_term(Term, State),
+ {follower, State1, [{next_event, Msg}]};
+handle_follower(#info_rpc{} = InfoRpc, State) ->
+ InfoReplyEffect = info_reply_effect(State, InfoRpc),
+ {follower, State, [InfoReplyEffect]};
+handle_follower(#info_reply{term = Term} = Msg,
+ #{current_term := CurTerm} = State)
+ when CurTerm < Term ->
+ State1 = update_term(Term, State),
+ {follower, State1, [{next_event, Msg}]};
+handle_follower(#info_reply{}, State) ->
+ {follower, State, []};
handle_follower(Msg, State) ->
log_unhandled_msg(follower, Msg, State),
{follower, State, [{reply, {error, {unsupported_call, Msg}}}]}.
@@ -1465,6 +1573,39 @@ handle_receive_snapshot(receive_snapshot_timeout, #{log := Log0} = State) ->
handle_receive_snapshot({register_external_log_reader, Pid}, #{log := Log0} = State) ->
{Log, Effs} = ra_log:register_reader(Pid, Log0),
{receive_snapshot, State#{log => Log}, Effs};
+handle_receive_snapshot(#info_rpc{term = Term} = Msg,
+ #{current_term := CurTerm,
+ cfg := #cfg{log_id = LogId},
+ log := Log0} = State)
+ when CurTerm < Term ->
+ ?INFO("~ts: follower receiving snapshot saw info_rpc from ~w for term ~b "
+ "abdicates term: ~b!",
+ [LogId, Msg#info_rpc.from,
+ Term, CurTerm]),
+ SnapState0 = ra_log:snapshot_state(Log0),
+ SnapState = ra_snapshot:abort_accept(SnapState0),
+ Log = ra_log:set_snapshot_state(SnapState, Log0),
+ {follower, update_term(Term, clear_leader_id(State#{log => Log})),
+ [{next_event, Msg}]};
+handle_receive_snapshot(#info_rpc{} = InfoRpc, State) ->
+ InfoReplyEffect = empty_info_reply_effect(State, InfoRpc),
+ {receive_snapshot, State, [InfoReplyEffect]};
+handle_receive_snapshot(#info_reply{term = Term} = Msg,
+ #{current_term := CurTerm,
+ cfg := #cfg{log_id = LogId},
+ log := Log0} = State)
+ when CurTerm < Term ->
+ ?INFO("~ts: follower receiving snapshot saw info_reply from ~w for term ~b "
+ "abdicates term: ~b!",
+ [LogId, Msg#info_reply.from,
+ Term, CurTerm]),
+ SnapState0 = ra_log:snapshot_state(Log0),
+ SnapState = ra_snapshot:abort_accept(SnapState0),
+ Log = ra_log:set_snapshot_state(SnapState, Log0),
+ {follower, update_term(Term, clear_leader_id(State#{log => Log})),
+ [{next_event, Msg}]};
+handle_receive_snapshot(#info_reply{}, State) ->
+ {receive_snapshot, State, []};
handle_receive_snapshot(Msg, State) ->
log_unhandled_msg(receive_snapshot, Msg, State),
%% drop all other events??
@@ -1533,9 +1674,10 @@ process_new_leader_queries(#{pending_consistent_queries := Pending,
-spec tick(ra_server_state()) -> effects().
tick(#{cfg := #cfg{effective_machine_module = MacMod},
- machine_state := MacState}) ->
+ machine_state := MacState} = State) ->
+ InfoRpcEffects = info_rpc_effects(State),
Now = erlang:system_time(millisecond),
- ra_machine:tick(MacMod, Now, MacState).
+ InfoRpcEffects ++ ra_machine:tick(MacMod, Now, MacState).
-spec log_tick(ra_server_state()) -> ra_server_state().
log_tick(#{cfg := #cfg{},
@@ -3314,6 +3456,178 @@ after_log_append_reply(Cmd, Idx, Term, Effects0) ->
Effects0
end.
+post_election_effects(
+ #{cfg := #cfg{effective_machine_version = EffectiveMacVer,
+ machine = Mac,
+ system_config = SystemConfig}} = State) ->
+ Peers = peers(State),
+ PeerIds = maps:keys(Peers),
+
+ MachineUpgradeStrategy = maps:get(machine_upgrade_strategy, SystemConfig,
+ ?DEFAULT_MACHINE_UPGRADE_STRATEGY),
+ UpgradeMachineNow = (PeerIds =:= [] orelse
+ MachineUpgradeStrategy =:= quorum),
+
+ case UpgradeMachineNow of
+ true ->
+ %% This node is alone in the cluster, we can send the `noop'
+ %% command with the newer machine version right away.
+ MacVer = ra_machine:version(Mac),
+ Noop = {noop, #{ts => erlang:system_time(millisecond)},
+ MacVer},
+ NoopEffect = {next_event, cast, {command, Noop}},
+ [NoopEffect];
+ false ->
+ %% We continue to send the `noop' command immediately after
+ %% becoming a leader, but compared to Ra 2.15 and older, we don't
+ %% set the machine version to the latest: we keep the same
+ %% effective machine version for now.
+ %%
+ %% However, we query info keys from all peers, including their
+ %% supported machine version. The replies will be used to
+ %% determine the max supported machine version and when it is
+ %% greater than the effective one, another `noop' command will be
+ %% sent.
+ Noop = {noop, #{ts => erlang:system_time(millisecond)},
+ EffectiveMacVer},
+ NoopEffect = {next_event, cast, {command, Noop}},
+ InfoRpcEffects = info_rpc_effects(State),
+ [NoopEffect | InfoRpcEffects]
+ end.
+
+info_rpc_effects(#{cfg := #cfg{id = Id}, cluster := Cluster} = State) ->
+ InfoRpcEffects = maps:fold(
+ fun
+ (PeerId, _, Acc) when PeerId =:= Id ->
+ Acc;
+ (PeerId, _, Acc) ->
+ Acc ++ info_rpc_effects_for_peer(State, PeerId)
+ end, [], Cluster),
+ InfoRpcEffects.
+
+info_rpc_effects_for_peer(
+ #{cluster := Cluster, current_term := CurTerm} = State, PeerId) ->
+ %% We determine if we need to ask (for the fist time or again) the info
+ %% from a peer.
+ SendRpc = case Cluster of
+ #{PeerId := #{machine_version := PeerMacVer}} ->
+ %% We have the machine version of the peer, but we want
+ %% to ask again if that version is old, in the hope the
+ %% peer restarted and was updated.
+ MacVer = machine_version(State),
+ PeerMacVer < MacVer;
+ _ ->
+ %% We don't have any details about the peer, we ask.
+ true
+ end,
+ case SendRpc of
+ true ->
+ %% We ask for all info keys currently. If we ask for specific
+ %% keys, we will have to handle merging of already known info keys
+ %% and updates.
+ Id = id(State),
+ Command = #info_rpc{from = Id,
+ term = CurTerm,
+ keys = [machine_version]},
+ [{send_rpc, PeerId, Command}];
+ false ->
+ []
+ end.
+
+info_reply_effect(#{current_term := CurTerm} = State,
+ #info_rpc{from = FromId, keys = Keys}) ->
+ Id = id(State),
+ Info = ra_server_info(State, Keys),
+ InfoReply = #info_reply{from = Id,
+ term = CurTerm,
+ keys = Keys,
+ info = Info},
+ {cast, FromId, InfoReply}.
+
+empty_info_reply_effect(#{current_term := CurTerm} = State,
+ #info_rpc{from = FromId, keys = Keys}) ->
+ Id = id(State),
+ InfoReply = #info_reply{from = Id,
+ term = CurTerm,
+ keys = Keys,
+ info = #{}},
+ {cast, FromId, InfoReply}.
+
+ra_server_info(State) ->
+ MacVer = machine_version(State),
+ #{machine_version => MacVer}.
+
+ra_server_info(State, Keys) ->
+ %% Note that the node that asked may ask for keys we don't know.
+ Info0 = ra_server_info(State),
+ Info1 = maps:filter(
+ fun(Key, _Value) ->
+ lists:member(Key, Keys)
+ end, Info0),
+ Info1.
+
+handle_info_reply(
+ #{cluster := Cluster} = State,
+ #info_reply{from = PeerId, keys = Keys, info = Info}) ->
+ PeerState0 = maps:get(PeerId, Cluster),
+ PeerState1 = lists:foldl(fun(Key, PS) ->
+ case Info of
+ #{Key := Value} ->
+ PS#{Key => Value};
+ _ ->
+ maps:remove(Key, PS)
+ end
+ end, PeerState0, Keys),
+ Cluster1 = Cluster#{PeerId => PeerState1},
+ State1 = State#{cluster => Cluster1},
+ determine_if_machine_upgrade_allowed(State1).
+
+determine_if_machine_upgrade_allowed(
+ #{cfg := #cfg{effective_machine_version = EffectiveMacVer,
+ log_id = LogId}} = State) ->
+ Effects = case get_max_supported_machine_version(State) of
+ MaxSupMacVer
+ when MaxSupMacVer > EffectiveMacVer ->
+ ?DEBUG(
+ "~ts: max supported machine version = ~b, "
+ "upgrading from ~b",
+ [LogId, MaxSupMacVer, EffectiveMacVer]),
+ Noop = {noop,
+ #{ts => erlang:system_time(millisecond)},
+ MaxSupMacVer},
+ [{next_event, cast, {command, Noop}}];
+ _ ->
+ []
+ end,
+ {State, Effects}.
+
+get_max_supported_machine_version(
+ #{cfg := #cfg{effective_machine_version = EffectiveMacVer,
+ id = Id}, cluster := Cluster} = State) ->
+ MacVer = machine_version(State),
+ MaxSupMacVer = maps:fold(
+ fun
+ (PeerId, #{machine_version := PeerMacVer}, Max)
+ when PeerId =/= Id
+ andalso PeerMacVer < Max ->
+ %% This peer has a lower machine version than the
+ %% previous peers in the list so far. This becomes
+ %% the highest machine version we can upgrade to.
+ PeerMacVer;
+ (PeerId, PeerState, _Max)
+ when PeerId =/= Id andalso
+ not is_map_key(machine_version, PeerState) ->
+ %% We don't have the machine version of one of the
+ %% peers yet. We need to stay on the effective
+ %% machine version.
+ EffectiveMacVer;
+ (_PeerId, _PeerState, Max) ->
+ %% Either this is this Ra server or this is a peer
+ %% using a newer machine version.
+ Max
+ end, MacVer, Cluster),
+ MaxSupMacVer.
+
%%% ===================
%%% Internal unit tests
%%% ===================
diff --git a/src/ra_server.hrl b/src/ra_server.hrl
index 18afd039..060afa99 100644
--- a/src/ra_server.hrl
+++ b/src/ra_server.hrl
@@ -8,6 +8,7 @@
-define(DEFAULT_MAX_PIPELINE_COUNT, 4096).
-define(DEFAULT_SNAPSHOT_CHUNK_SIZE, 1000000). % 1MB
-define(DEFAULT_RECEIVE_SNAPSHOT_TIMEOUT, 30000).
+-define(DEFAULT_MACHINE_UPGRADE_STRATEGY, all).
-define(FLUSH_COMMANDS_SIZE, 16).
-record(cfg,
diff --git a/src/ra_system.erl b/src/ra_system.erl
index 94befc12..68a0664e 100644
--- a/src/ra_system.erl
+++ b/src/ra_system.erl
@@ -58,7 +58,8 @@
low_priority_commands_in_memory_size => non_neg_integer(),
server_recovery_strategy => undefined |
registered |
- {module(), atom(), list()}
+ {module(), atom(), list()},
+ machine_upgrade_strategy => ra_server:machine_upgrade_strategy()
}.
-export_type([
@@ -116,6 +117,8 @@ default_config() ->
?FLUSH_COMMANDS_SIZE),
LowPriorityInMemSize = application:get_env(ra, low_priority_commands_in_memory_size,
?FLUSH_COMMANDS_SIZE),
+ MachineUpgradeStrategy = application:get_env(ra, machine_upgrade_strategy,
+ ?DEFAULT_MACHINE_UPGRADE_STRATEGY),
#{name => default,
data_dir => DataDir,
wal_data_dir => WalDataDir,
@@ -142,6 +145,7 @@ default_config() ->
receive_snapshot_timeout => ReceiveSnapshotTimeout,
low_priority_commands_flush_size => LowPriorityCommandsFlushSize,
low_priority_commands_in_memory_size => LowPriorityInMemSize,
+ machine_upgrade_strategy => MachineUpgradeStrategy,
names => #{wal => ra_log_wal,
wal_sup => ra_log_wal_sup,
log_sup => ra_log_sup,
diff --git a/test/ra_machine_version_SUITE.erl b/test/ra_machine_version_SUITE.erl
index 7ebf8d83..665f68bb 100644
--- a/test/ra_machine_version_SUITE.erl
+++ b/test/ra_machine_version_SUITE.erl
@@ -29,11 +29,11 @@ all() ->
all_tests() ->
[
server_with_higher_version_needs_quorum_to_be_elected,
+ cluster_waits_for_all_members_to_have_latest_version_to_upgrade,
server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher,
unversioned_machine_never_sees_machine_version_command,
unversioned_can_change_to_versioned,
server_upgrades_machine_state_on_noop_command,
- lower_version_does_not_apply_until_upgraded,
server_applies_with_new_module
% snapshot_persists_machine_version
].
@@ -65,6 +65,14 @@ end_per_group(_Group, _Config) ->
init_per_testcase(TestCase, Config) ->
ok = logger:set_primary_config(level, all),
ra_server_sup_sup:remove_all(?SYS),
+ case TestCase of
+ server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher ->
+ ok = application:set_env(ra, machine_upgrade_strategy, quorum),
+ _ = ra_system:stop_default(),
+ {ok, _} = ra_system:start_default();
+ _ ->
+ ok
+ end,
ServerName1 = list_to_atom(atom_to_list(TestCase) ++ "1"),
ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"),
ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"),
@@ -83,9 +91,17 @@ init_per_testcase(TestCase, Config) ->
{server_id3, {ServerName3, node()}}
| Config].
-end_per_testcase(_TestCase, Config) ->
+end_per_testcase(TestCase, Config) ->
catch ra:delete_cluster(?config(cluster, Config)),
meck:unload(),
+ case TestCase of
+ server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher ->
+ ok = application:unset_env(ra, machine_upgrade_strategy),
+ _ = ra_system:stop_default(),
+ {ok, _} = ra_system:start_default();
+ _ ->
+ ok
+ end,
ok.
%%%===================================================================
@@ -135,6 +151,78 @@ server_with_higher_version_needs_quorum_to_be_elected(Config) ->
?assertNotEqual(LastFollower, Leader3),
ok.
+cluster_waits_for_all_members_to_have_latest_version_to_upgrade(Config) ->
+ ok = ra_env:configure_logger(logger),
+ LogFile = filename:join([?config(priv_dir, Config), "ra.log"]),
+ LogConfig = #{config => #{type => {file, LogFile}}, level => debug},
+ logger:add_handler(ra_handler, logger_std_h, LogConfig),
+ ok = logger:set_primary_config(level, all),
+ ct:pal("handler config ~p", [logger:get_handler_config()]),
+ Mod = ?config(modname, Config),
+ meck:new(Mod, [non_strict]),
+ meck:expect(Mod, init, fun (_) -> init_state end),
+ meck:expect(Mod, version, fun () -> 1 end),
+ meck:expect(Mod, which_module, fun (_) -> Mod end),
+ meck:expect(Mod, apply, fun (_, _, S) -> {S, ok} end),
+ Cluster = ?config(cluster, Config),
+ ClusterName = ?config(cluster_name, Config),
+ Leader = start_cluster(ClusterName, {module, Mod, #{}}, Cluster),
+ [Follower1, Follower2] = lists:delete(Leader, Cluster),
+ timer:sleep(100),
+ %% leader and follower 1 are v2s
+ ra:stop_server(?SYS, Leader),
+ ra:stop_server(?SYS, Follower1),
+ ra:stop_server(?SYS, Follower2),
+ meck:expect(Mod, version, fun () ->
+ New = [whereis(element(1, Leader)),
+ whereis(element(1, Follower1))],
+ case lists:member(self(), New) of
+ true -> 2;
+ _ -> 1
+ end
+ end),
+ ra:restart_server(?SYS, Leader),
+ ra:restart_server(?SYS, Follower1),
+ timer:sleep(100),
+ {ok, _, _Leader1} = ra:members(Leader, 2000),
+ ra:restart_server(?SYS, Follower2),
+ %% The cluster is still using v1 even though Leader and Follower2 knows
+ %% about v2.
+ lists:foreach(
+ fun(Member) ->
+ await(fun () ->
+ case ra:member_overview(Member) of
+ {ok, #{effective_machine_version := 1,
+ machine_version := 1}, _}
+ when Member == Follower2 ->
+ true;
+ {ok, #{effective_machine_version := 1,
+ machine_version := 2}, _} ->
+ true;
+ _ ->
+ false
+ end
+ end, 100)
+ end, Cluster),
+ %% Restart Follower2 with v2. The cluster should now upgrade to v2.
+ ra:stop_server(?SYS, Follower2),
+ meck:expect(Mod, version, fun () -> 2 end),
+ ra:restart_server(?SYS, Follower2),
+ lists:foreach(
+ fun(Member) ->
+ await(fun () ->
+ case ra:member_overview(Member) of
+ {ok, #{effective_machine_version := 2,
+ machine_version := 2}, _} ->
+ true;
+ _ ->
+ false
+ end
+ end, 100)
+ end, Cluster),
+
+ ok.
+
server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher(Config) ->
ok = ra_env:configure_logger(logger),
LogFile = filename:join([?config(priv_dir, Config), "ra.log"]),
@@ -336,89 +424,6 @@ server_applies_with_new_module(Config) ->
{ok, state_v1, _} = ra:consistent_query(ServerId, fun ra_lib:id/1),
ok.
-lower_version_does_not_apply_until_upgraded(Config) ->
- ok = logger:set_primary_config(level, all),
- Mod = ?config(modname, Config),
- meck:new(Mod, [non_strict]),
- meck:expect(Mod, init, fun (_) -> init_state end),
- meck:expect(Mod, version, fun () -> 1 end),
- meck:expect(Mod, which_module, fun (_) -> Mod end),
- meck:expect(Mod, apply, fun
- (_, {machine_version, _, _}, S) ->
- %% retain state for machine versions
- {S, ok};
- (_, C, _) ->
- %% any other command replaces the state
- {C, ok}
- end),
- Cluster = ?config(cluster, Config),
- ClusterName = ?config(cluster_name, Config),
- %% 3 node cluster, upgrade the first two to the later version
- %% leaving the follower on a lower version
- Leader = start_cluster(ClusterName, {module, Mod, #{}}, Cluster),
- Followers = lists:delete(Leader, Cluster),
- ct:pal("Leader1 ~w Followers ~w", [Leader, Followers]),
- meck:expect(Mod, version, fun () ->
- Self = self(),
- case whereis(element(1, Leader)) of
- Self -> 2;
- _ -> 1
- end
- end),
- timer:sleep(200),
- ra:stop_server(?SYS, Leader),
- {ok, _, Leader2} = ra:members(Followers),
- [LastFollower] = lists:delete(Leader2, Followers),
- ct:pal("Leader2 ~w LastFollower ~w", [Leader2, LastFollower]),
- ra:restart_server(?SYS, Leader),
- meck:expect(Mod, version, fun () ->
- New = [whereis(element(1, Leader)),
- whereis(element(1, Leader2))],
- case lists:member(self(), New) of
- true -> 2;
- _ -> 1
- end
- end),
- ra:stop_server(?SYS, Leader2),
- timer:sleep(500),
- {ok, _, Leader3} = ra:members(LastFollower),
- ct:pal("Leader3 ~w LastFollower ~w", [Leader3, LastFollower]),
- ra:restart_server(?SYS, Leader2),
-
- case Leader3 of
- LastFollower ->
- %% if last follower happened to be elected
- ct:pal("Leader3 is LastFollower", []),
- ra:stop_server(?SYS, Leader3),
- %% allow time for a different member to be elected
- timer:sleep(1000),
- ra:restart_server(?SYS, Leader3);
- _ -> ok
- end,
-
-
- %% process a command that should be replicated to all servers but only
- %% applied to new machine version servers
- {ok, ok, _} = ra:process_command(Leader, dummy),
- %% a little sleep to make it more likely that replication is complete to
- %% all servers and not just a quorum
- timer:sleep(100),
-
- %% the updated servers should have the same state
- {ok, {{Idx, _}, dummy}, _} = ra:local_query(Leader, fun ra_lib:id/1),
- {ok, {{Idx, _}, dummy}, _} = ra:local_query(Leader2, fun ra_lib:id/1),
- %% the last follower with the lower machine version should not have
- %% applied the last command
- {ok, {{LFIdx, _}, init_state}, _} = ra:local_query(LastFollower, fun ra_lib:id/1),
-
- ra:stop_server(?SYS, LastFollower),
- ra:restart_server(?SYS, LastFollower),
-
- {ok, {{LFIdx, _}, init_state}, _} = ra:local_query(LastFollower, fun ra_lib:id/1),
-
- ?assert(Idx > LFIdx),
- ok.
-
snapshot_persists_machine_version(_Config) ->
error({todo, ?FUNCTION_NAME}).
diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl
index a17ba982..a4663e31 100644
--- a/test/ra_server_SUITE.erl
+++ b/test/ra_server_SUITE.erl
@@ -1861,7 +1861,8 @@ candidate_election(_Config) ->
{follower, #{current_term := 7}, []}
= ra_server:handle_candidate(HighTermResult, State1),
- MacVer = 1,
+ EffectiveMacVer = 0,
+ MacVer = EffectiveMacVer + 1,
meck:expect(ra_machine, version, fun (_) -> MacVer end),
% quorum has been achieved - candidate becomes leader
@@ -1875,7 +1876,12 @@ candidate_election(_Config) ->
N4 := PeerState,
N5 := PeerState}},
[
- {next_event, cast, {command, {noop, _, MacVer}}},
+ {next_event, cast, {command, {noop, _, EffectiveMacVer}}},
+ {send_rpc, N2, {info_rpc, _, _, _}},
+ {send_rpc, N3, {info_rpc, _, _, _}},
+ {send_rpc, N4, {info_rpc, _, _, _}},
+ {send_rpc, N5, {info_rpc, _, _, _}},
+
{send_rpc, _, _},
{send_rpc, _, _},
{send_rpc, _, _},