From e9c82a0bb5eda12cc61137155759aee2f5cdd9e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Thu, 19 Dec 2024 18:45:28 +0100 Subject: [PATCH] Delay machine upgrade until all Ra servers support it [Why] Before this patch, a Ra cluster would switch to a new machine version immediately after a leader with that version was elected. Because a leader can be elected with a quorum number of candidate voting for it, it means the cluster could start using the new machine version as soon as a quorum of members support that version. Unfortunately, other members that do not support it stop applying commands because they run an older version of the machine code. For some consumers of Ra, like Khepri, this means they could cease their operation locally until the member is restarted with the new machine version. We want to delay the machine upgrade to a point where all members know about the new version. This ensures all members can continue to provide their service. [How] The machine version to use is communicated by the leader using the `noop` command. This command is the first one sent just after an election. The machine version passed was the local machine version. With this patch, the `noop` command sent after an election passes the effective machine version, except if the leader is unclustered alone (in which case it passes the latest machine version. Therefore in a cluster, the leader will send a second `noop` command with a newer machine version later, once all members support it. To determine what each follower supports, this patch introduces two commands: * `#info_rpc{}` * `#info_reply{}` Once a leader is elected, in addition to the `noop` command, it sends an `#info_rpc{}` command to all followers. They reply with `#info_reply{}` with the machine version they support. This mechanism is not specific to machine upgrades: this could be extended in the future to communicate more details about each follower. Once the leader received the machine version of every followers, it can determine the highest possible supported machine version. For that, it simply takes the lowest reported machine version (including the leader's machine version). If this version is greater than the effective machine version, the leader sends a new `noop` command with the new machine version to use. The leader sends the `#info_rpc{}` command again and again to some followers at each "tick", if these followers did not report anything yet, or if the reported machine version is lower than its own supported machine version. This takes care of follower that did not receive the initial `#info_rpc{}` and those that were restarted as part of an upgrade. Fixes #490. V2: Address comments from @kjnilsson: * Use an empty map by default in `#info_reply{}` instead of `undefined`. This simplifies the handling of the reply with a single `lists:foldl/3` instead of two. * Merge `has_enough_peer_info/1` into `get_max_supported_machine_version/1`. * Add a system-level option to restore the Ra 2.15 behavior. --- src/ra.hrl | 14 +- src/ra_server.erl | 330 +++++++++++++++++++++++++++++- src/ra_server.hrl | 1 + src/ra_system.erl | 6 +- test/ra_machine_version_SUITE.erl | 175 ++++++++-------- test/ra_server_SUITE.erl | 10 +- 6 files changed, 439 insertions(+), 97 deletions(-) diff --git a/src/ra.hrl b/src/ra.hrl index cef26c04..5f70d38f 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -70,7 +70,8 @@ voter_status => ra_voter_status(), %% indicates that a snapshot is being sent %% to the peer - status := ra_peer_status()}. + status := ra_peer_status(), + machine_version => ra_machine:version()}. -type ra_cluster() :: #{ra_server_id() => ra_peer_state()}. @@ -187,6 +188,17 @@ {query_index :: integer(), term :: ra_term()}). +-record(info_rpc, + {from :: ra_server_id(), + term :: ra_term(), + keys :: [ra_server:ra_server_info_key()]}). + +-record(info_reply, + {from :: ra_server_id(), + term :: ra_term(), + keys :: [ra_server:ra_server_info_key()], + info = #{} :: ra_server:ra_server_info()}). + %% WAL defaults -define(WAL_DEFAULT_MAX_SIZE_BYTES, 256 * 1000 * 1000). -define(WAL_DEFAULT_MAX_BATCH_SIZE, 8192). diff --git a/src/ra_server.erl b/src/ra_server.erl index bffa31cd..36fa5720 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -146,6 +146,8 @@ ra_log:event() | {consistent_query, term(), ra:query_fun()} | #heartbeat_rpc{} | + #info_rpc{} | + #info_reply{} | {ra_server_id, #heartbeat_reply{}} | pipeline_rpcs. @@ -192,6 +194,18 @@ %% The simple machine config is version that can only be used for simple state %% machines that cannot access any of the advanced features. +-type machine_upgrade_strategy() :: all | quorum. +%% When a new Ra machine version should be upgraded to. +%% +%% + -type ra_server_config() :: #{id := ra_server_id(), uid := ra_uid(), %% a friendly name to refer to a particular @@ -219,6 +233,27 @@ has_changed => boolean() }. +-type ra_server_info_key() :: machine_version | atom(). +%% Key one can get in `ra_server_info()'. +%% +%% This is used in the `#info_rpc{}' to ask for a specific list of info keys. +%% +%% Key meanings: +%% +%% +%% Any atom is supported because a future version of Ra could ask for +%% something this version does not know about. + +-type ra_server_info() :: #{machine_version => ra_machine:version(), + atom() => any()}. +%% Info for a Ra server, got from `#info_reply{}'. +%% +%% In addition, the map may contain keys unknown to this version of Ra but +%% emitted by a future version. + -type mutable_config() :: #{cluster_name => ra_cluster_name(), metrics_key => term(), broadcast_time => non_neg_integer(), % ms @@ -235,6 +270,8 @@ ra_server_state/0, ra_state/0, ra_server_config/0, + ra_server_info_key/0, + ra_server_info/0, mutable_config/0, ra_msg/0, machine_conf/0, @@ -246,7 +283,8 @@ command_reply_mode/0, ra_event_formatter_fun/0, effect/0, - effects/0 + effects/0, + machine_upgrade_strategy/0 ]). -spec name(ClusterName :: ra_cluster_name(), UniqueSuffix::string()) -> atom(). @@ -814,6 +852,29 @@ handle_leader(#request_vote_result{}, State) -> handle_leader(#pre_vote_result{}, State) -> %% handle to avoid logging as unhandled {leader, State, []}; +handle_leader(#info_rpc{term = Term} = Msg, + #{current_term := CurTerm, + cfg := #cfg{log_id = LogId}} = State0) + when CurTerm < Term -> + ?INFO("~ts: leader saw info_rpc from ~w for term ~b, abdicates term: ~b!", + [LogId, Msg#info_rpc.from, Term, CurTerm]), + {follower, update_term(Term, State0#{leader_id => undefined}), + [{next_event, Msg}]}; +handle_leader(#info_rpc{} = InfoRpc, State) -> + InfoReplyEffect = info_reply_effect(State, InfoRpc), + {leader, State, [InfoReplyEffect]}; +handle_leader(#info_reply{term = Term} = Msg, + #{current_term := CurTerm, + cfg := #cfg{log_id = LogId}} = State0) + when CurTerm < Term -> + ?INFO("~ts: leader saw info_reply from ~w for term ~b, abdicates " + "term: ~b!", + [LogId, Msg#info_reply.from, Term, CurTerm]), + {follower, update_term(Term, State0#{leader_id => undefined}), + [{next_event, Msg}]}; +handle_leader(#info_reply{} = InfoReply, State) -> + {State1, Effects} = handle_info_reply(State, InfoReply), + {leader, State1, Effects}; handle_leader({transfer_leadership, Leader}, #{cfg := #cfg{id = Leader, log_id = LogId}} = State) -> ?DEBUG("~ts: transfer leadership requested but already leader", @@ -860,8 +921,7 @@ handle_leader(Msg, State) -> {ra_state(), ra_server_state(), effects()}. handle_candidate(#request_vote_result{term = Term, vote_granted = true}, #{cfg := #cfg{id = Id, - log_id = LogId, - machine = Mac}, + log_id = LogId}, current_term := Term, votes := Votes, cluster := Nodes} = State0) -> @@ -871,11 +931,10 @@ handle_candidate(#request_vote_result{term = Term, vote_granted = true}, case required_quorum(Nodes) of NewVotes -> {State1, Effects} = make_all_rpcs(initialise_peers(State0)), - Noop = {noop, #{ts => erlang:system_time(millisecond)}, - ra_machine:version(Mac)}, State = State1#{leader_id => Id}, + PostElectionEffects = post_election_effects(State), {leader, maps:without([votes], State), - [{next_event, cast, {command, Noop}} | Effects]}; + PostElectionEffects ++ Effects}; _ -> {candidate, State0#{votes => NewVotes}, []} end; @@ -966,6 +1025,27 @@ handle_candidate({register_external_log_reader, Pid}, #{log := Log0} = State) -> {candidate, State#{log => Log}, Effs}; handle_candidate(force_member_change, State0) -> {follower, State0#{votes => 0}, [{next_event, force_member_change}]}; +handle_candidate(#info_rpc{term = Term} = Msg, + #{current_term := CurTerm, + cfg := #cfg{log_id = LogId}} = State0) + when CurTerm < Term -> + ?INFO("~ts: candidate info_rpc with higher term received ~b -> ~b", + [LogId, CurTerm, Term]), + State = update_term_and_voted_for(Term, undefined, State0), + {follower, State, [{next_event, Msg}]}; +handle_candidate(#info_rpc{} = InfoRpc, State) -> + InfoReplyEffect = empty_info_reply_effect(State, InfoRpc), + {candidate, State, [InfoReplyEffect]}; +handle_candidate(#info_reply{term = Term} = Msg, + #{current_term := CurTerm, + cfg := #cfg{log_id = LogId}} = State0) + when CurTerm < Term -> + ?INFO("~ts: candidate info_reply with higher term received ~b -> ~b", + [LogId, CurTerm, Term]), + State = update_term_and_voted_for(Term, undefined, State0), + {follower, State, [{next_event, Msg}]}; +handle_candidate(#info_reply{}, State) -> + {candidate, State, []}; handle_candidate(Msg, State) -> log_unhandled_msg(candidate, Msg, State), {candidate, State, [{reply, {error, {unsupported_call, Msg}}}]}. @@ -1048,6 +1128,19 @@ handle_pre_vote({register_external_log_reader, Pid}, #{log := Log0} = State) -> {pre_vote, State#{log => Log}, Effs}; handle_pre_vote(force_member_change, State0) -> {follower, State0#{votes => 0}, [{next_event, force_member_change}]}; +handle_pre_vote(#info_rpc{term = Term} = Msg, + #{current_term := CurTerm} = State0) + when CurTerm < Term -> + {follower, State0#{votes => 0}, [{next_event, Msg}]}; +handle_pre_vote(#info_rpc{} = InfoRpc, State) -> + InfoReplyEffect = empty_info_reply_effect(State, InfoRpc), + {pre_vote, State, [InfoReplyEffect]}; +handle_pre_vote(#info_reply{term = Term} = Msg, + #{current_term := CurTerm} = State0) + when CurTerm < Term -> + {follower, State0#{votes => 0}, [{next_event, Msg}]}; +handle_pre_vote(#info_reply{}, State) -> + {pre_vote, State, []}; handle_pre_vote(Msg, State) -> log_unhandled_msg(pre_vote, Msg, State), {pre_vote, State, [{reply, {error, {unsupported_call, Msg}}}]}. @@ -1349,6 +1442,21 @@ handle_follower(force_member_change, {ok, _, _, State, Effects} = append_cluster_change(Cluster, undefined, no_reply, State0, []), call_for_election(pre_vote, State, [{reply, ok} | Effects]); +handle_follower(#info_rpc{term = Term} = Msg, + #{current_term := CurTerm} = State) + when CurTerm < Term -> + State1 = update_term(Term, State), + {follower, State1, [{next_event, Msg}]}; +handle_follower(#info_rpc{} = InfoRpc, State) -> + InfoReplyEffect = info_reply_effect(State, InfoRpc), + {follower, State, [InfoReplyEffect]}; +handle_follower(#info_reply{term = Term} = Msg, + #{current_term := CurTerm} = State) + when CurTerm < Term -> + State1 = update_term(Term, State), + {follower, State1, [{next_event, Msg}]}; +handle_follower(#info_reply{}, State) -> + {follower, State, []}; handle_follower(Msg, State) -> log_unhandled_msg(follower, Msg, State), {follower, State, [{reply, {error, {unsupported_call, Msg}}}]}. @@ -1465,6 +1573,39 @@ handle_receive_snapshot(receive_snapshot_timeout, #{log := Log0} = State) -> handle_receive_snapshot({register_external_log_reader, Pid}, #{log := Log0} = State) -> {Log, Effs} = ra_log:register_reader(Pid, Log0), {receive_snapshot, State#{log => Log}, Effs}; +handle_receive_snapshot(#info_rpc{term = Term} = Msg, + #{current_term := CurTerm, + cfg := #cfg{log_id = LogId}, + log := Log0} = State) + when CurTerm < Term -> + ?INFO("~ts: follower receiving snapshot saw info_rpc from ~w for term ~b " + "abdicates term: ~b!", + [LogId, Msg#info_rpc.from, + Term, CurTerm]), + SnapState0 = ra_log:snapshot_state(Log0), + SnapState = ra_snapshot:abort_accept(SnapState0), + Log = ra_log:set_snapshot_state(SnapState, Log0), + {follower, update_term(Term, clear_leader_id(State#{log => Log})), + [{next_event, Msg}]}; +handle_receive_snapshot(#info_rpc{} = InfoRpc, State) -> + InfoReplyEffect = empty_info_reply_effect(State, InfoRpc), + {receive_snapshot, State, [InfoReplyEffect]}; +handle_receive_snapshot(#info_reply{term = Term} = Msg, + #{current_term := CurTerm, + cfg := #cfg{log_id = LogId}, + log := Log0} = State) + when CurTerm < Term -> + ?INFO("~ts: follower receiving snapshot saw info_reply from ~w for term ~b " + "abdicates term: ~b!", + [LogId, Msg#info_reply.from, + Term, CurTerm]), + SnapState0 = ra_log:snapshot_state(Log0), + SnapState = ra_snapshot:abort_accept(SnapState0), + Log = ra_log:set_snapshot_state(SnapState, Log0), + {follower, update_term(Term, clear_leader_id(State#{log => Log})), + [{next_event, Msg}]}; +handle_receive_snapshot(#info_reply{}, State) -> + {receive_snapshot, State, []}; handle_receive_snapshot(Msg, State) -> log_unhandled_msg(receive_snapshot, Msg, State), %% drop all other events?? @@ -1533,9 +1674,10 @@ process_new_leader_queries(#{pending_consistent_queries := Pending, -spec tick(ra_server_state()) -> effects(). tick(#{cfg := #cfg{effective_machine_module = MacMod}, - machine_state := MacState}) -> + machine_state := MacState} = State) -> + InfoRpcEffects = info_rpc_effects(State), Now = erlang:system_time(millisecond), - ra_machine:tick(MacMod, Now, MacState). + InfoRpcEffects ++ ra_machine:tick(MacMod, Now, MacState). -spec log_tick(ra_server_state()) -> ra_server_state(). log_tick(#{cfg := #cfg{}, @@ -3314,6 +3456,178 @@ after_log_append_reply(Cmd, Idx, Term, Effects0) -> Effects0 end. +post_election_effects( + #{cfg := #cfg{effective_machine_version = EffectiveMacVer, + machine = Mac, + system_config = SystemConfig}} = State) -> + Peers = peers(State), + PeerIds = maps:keys(Peers), + + MachineUpgradeStrategy = maps:get(machine_upgrade_strategy, SystemConfig, + ?DEFAULT_MACHINE_UPGRADE_STRATEGY), + UpgradeMachineNow = (PeerIds =:= [] orelse + MachineUpgradeStrategy =:= quorum), + + case UpgradeMachineNow of + true -> + %% This node is alone in the cluster, we can send the `noop' + %% command with the newer machine version right away. + MacVer = ra_machine:version(Mac), + Noop = {noop, #{ts => erlang:system_time(millisecond)}, + MacVer}, + NoopEffect = {next_event, cast, {command, Noop}}, + [NoopEffect]; + false -> + %% We continue to send the `noop' command immediately after + %% becoming a leader, but compared to Ra 2.15 and older, we don't + %% set the machine version to the latest: we keep the same + %% effective machine version for now. + %% + %% However, we query info keys from all peers, including their + %% supported machine version. The replies will be used to + %% determine the max supported machine version and when it is + %% greater than the effective one, another `noop' command will be + %% sent. + Noop = {noop, #{ts => erlang:system_time(millisecond)}, + EffectiveMacVer}, + NoopEffect = {next_event, cast, {command, Noop}}, + InfoRpcEffects = info_rpc_effects(State), + [NoopEffect | InfoRpcEffects] + end. + +info_rpc_effects(#{cfg := #cfg{id = Id}, cluster := Cluster} = State) -> + InfoRpcEffects = maps:fold( + fun + (PeerId, _, Acc) when PeerId =:= Id -> + Acc; + (PeerId, _, Acc) -> + Acc ++ info_rpc_effects_for_peer(State, PeerId) + end, [], Cluster), + InfoRpcEffects. + +info_rpc_effects_for_peer( + #{cluster := Cluster, current_term := CurTerm} = State, PeerId) -> + %% We determine if we need to ask (for the fist time or again) the info + %% from a peer. + SendRpc = case Cluster of + #{PeerId := #{machine_version := PeerMacVer}} -> + %% We have the machine version of the peer, but we want + %% to ask again if that version is old, in the hope the + %% peer restarted and was updated. + MacVer = machine_version(State), + PeerMacVer < MacVer; + _ -> + %% We don't have any details about the peer, we ask. + true + end, + case SendRpc of + true -> + %% We ask for all info keys currently. If we ask for specific + %% keys, we will have to handle merging of already known info keys + %% and updates. + Id = id(State), + Command = #info_rpc{from = Id, + term = CurTerm, + keys = [machine_version]}, + [{send_rpc, PeerId, Command}]; + false -> + [] + end. + +info_reply_effect(#{current_term := CurTerm} = State, + #info_rpc{from = FromId, keys = Keys}) -> + Id = id(State), + Info = ra_server_info(State, Keys), + InfoReply = #info_reply{from = Id, + term = CurTerm, + keys = Keys, + info = Info}, + {cast, FromId, InfoReply}. + +empty_info_reply_effect(#{current_term := CurTerm} = State, + #info_rpc{from = FromId, keys = Keys}) -> + Id = id(State), + InfoReply = #info_reply{from = Id, + term = CurTerm, + keys = Keys, + info = #{}}, + {cast, FromId, InfoReply}. + +ra_server_info(State) -> + MacVer = machine_version(State), + #{machine_version => MacVer}. + +ra_server_info(State, Keys) -> + %% Note that the node that asked may ask for keys we don't know. + Info0 = ra_server_info(State), + Info1 = maps:filter( + fun(Key, _Value) -> + lists:member(Key, Keys) + end, Info0), + Info1. + +handle_info_reply( + #{cluster := Cluster} = State, + #info_reply{from = PeerId, keys = Keys, info = Info}) -> + PeerState0 = maps:get(PeerId, Cluster), + PeerState1 = lists:foldl(fun(Key, PS) -> + case Info of + #{Key := Value} -> + PS#{Key => Value}; + _ -> + maps:remove(Key, PS) + end + end, PeerState0, Keys), + Cluster1 = Cluster#{PeerId => PeerState1}, + State1 = State#{cluster => Cluster1}, + determine_if_machine_upgrade_allowed(State1). + +determine_if_machine_upgrade_allowed( + #{cfg := #cfg{effective_machine_version = EffectiveMacVer, + log_id = LogId}} = State) -> + Effects = case get_max_supported_machine_version(State) of + MaxSupMacVer + when MaxSupMacVer > EffectiveMacVer -> + ?DEBUG( + "~ts: max supported machine version = ~b, " + "upgrading from ~b", + [LogId, MaxSupMacVer, EffectiveMacVer]), + Noop = {noop, + #{ts => erlang:system_time(millisecond)}, + MaxSupMacVer}, + [{next_event, cast, {command, Noop}}]; + _ -> + [] + end, + {State, Effects}. + +get_max_supported_machine_version( + #{cfg := #cfg{effective_machine_version = EffectiveMacVer, + id = Id}, cluster := Cluster} = State) -> + MacVer = machine_version(State), + MaxSupMacVer = maps:fold( + fun + (PeerId, #{machine_version := PeerMacVer}, Max) + when PeerId =/= Id + andalso PeerMacVer < Max -> + %% This peer has a lower machine version than the + %% previous peers in the list so far. This becomes + %% the highest machine version we can upgrade to. + PeerMacVer; + (PeerId, PeerState, _Max) + when PeerId =/= Id andalso + not is_map_key(machine_version, PeerState) -> + %% We don't have the machine version of one of the + %% peers yet. We need to stay on the effective + %% machine version. + EffectiveMacVer; + (_PeerId, _PeerState, Max) -> + %% Either this is this Ra server or this is a peer + %% using a newer machine version. + Max + end, MacVer, Cluster), + MaxSupMacVer. + %%% =================== %%% Internal unit tests %%% =================== diff --git a/src/ra_server.hrl b/src/ra_server.hrl index 18afd039..060afa99 100644 --- a/src/ra_server.hrl +++ b/src/ra_server.hrl @@ -8,6 +8,7 @@ -define(DEFAULT_MAX_PIPELINE_COUNT, 4096). -define(DEFAULT_SNAPSHOT_CHUNK_SIZE, 1000000). % 1MB -define(DEFAULT_RECEIVE_SNAPSHOT_TIMEOUT, 30000). +-define(DEFAULT_MACHINE_UPGRADE_STRATEGY, all). -define(FLUSH_COMMANDS_SIZE, 16). -record(cfg, diff --git a/src/ra_system.erl b/src/ra_system.erl index 94befc12..68a0664e 100644 --- a/src/ra_system.erl +++ b/src/ra_system.erl @@ -58,7 +58,8 @@ low_priority_commands_in_memory_size => non_neg_integer(), server_recovery_strategy => undefined | registered | - {module(), atom(), list()} + {module(), atom(), list()}, + machine_upgrade_strategy => ra_server:machine_upgrade_strategy() }. -export_type([ @@ -116,6 +117,8 @@ default_config() -> ?FLUSH_COMMANDS_SIZE), LowPriorityInMemSize = application:get_env(ra, low_priority_commands_in_memory_size, ?FLUSH_COMMANDS_SIZE), + MachineUpgradeStrategy = application:get_env(ra, machine_upgrade_strategy, + ?DEFAULT_MACHINE_UPGRADE_STRATEGY), #{name => default, data_dir => DataDir, wal_data_dir => WalDataDir, @@ -142,6 +145,7 @@ default_config() -> receive_snapshot_timeout => ReceiveSnapshotTimeout, low_priority_commands_flush_size => LowPriorityCommandsFlushSize, low_priority_commands_in_memory_size => LowPriorityInMemSize, + machine_upgrade_strategy => MachineUpgradeStrategy, names => #{wal => ra_log_wal, wal_sup => ra_log_wal_sup, log_sup => ra_log_sup, diff --git a/test/ra_machine_version_SUITE.erl b/test/ra_machine_version_SUITE.erl index 7ebf8d83..665f68bb 100644 --- a/test/ra_machine_version_SUITE.erl +++ b/test/ra_machine_version_SUITE.erl @@ -29,11 +29,11 @@ all() -> all_tests() -> [ server_with_higher_version_needs_quorum_to_be_elected, + cluster_waits_for_all_members_to_have_latest_version_to_upgrade, server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher, unversioned_machine_never_sees_machine_version_command, unversioned_can_change_to_versioned, server_upgrades_machine_state_on_noop_command, - lower_version_does_not_apply_until_upgraded, server_applies_with_new_module % snapshot_persists_machine_version ]. @@ -65,6 +65,14 @@ end_per_group(_Group, _Config) -> init_per_testcase(TestCase, Config) -> ok = logger:set_primary_config(level, all), ra_server_sup_sup:remove_all(?SYS), + case TestCase of + server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher -> + ok = application:set_env(ra, machine_upgrade_strategy, quorum), + _ = ra_system:stop_default(), + {ok, _} = ra_system:start_default(); + _ -> + ok + end, ServerName1 = list_to_atom(atom_to_list(TestCase) ++ "1"), ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"), ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"), @@ -83,9 +91,17 @@ init_per_testcase(TestCase, Config) -> {server_id3, {ServerName3, node()}} | Config]. -end_per_testcase(_TestCase, Config) -> +end_per_testcase(TestCase, Config) -> catch ra:delete_cluster(?config(cluster, Config)), meck:unload(), + case TestCase of + server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher -> + ok = application:unset_env(ra, machine_upgrade_strategy), + _ = ra_system:stop_default(), + {ok, _} = ra_system:start_default(); + _ -> + ok + end, ok. %%%=================================================================== @@ -135,6 +151,78 @@ server_with_higher_version_needs_quorum_to_be_elected(Config) -> ?assertNotEqual(LastFollower, Leader3), ok. +cluster_waits_for_all_members_to_have_latest_version_to_upgrade(Config) -> + ok = ra_env:configure_logger(logger), + LogFile = filename:join([?config(priv_dir, Config), "ra.log"]), + LogConfig = #{config => #{type => {file, LogFile}}, level => debug}, + logger:add_handler(ra_handler, logger_std_h, LogConfig), + ok = logger:set_primary_config(level, all), + ct:pal("handler config ~p", [logger:get_handler_config()]), + Mod = ?config(modname, Config), + meck:new(Mod, [non_strict]), + meck:expect(Mod, init, fun (_) -> init_state end), + meck:expect(Mod, version, fun () -> 1 end), + meck:expect(Mod, which_module, fun (_) -> Mod end), + meck:expect(Mod, apply, fun (_, _, S) -> {S, ok} end), + Cluster = ?config(cluster, Config), + ClusterName = ?config(cluster_name, Config), + Leader = start_cluster(ClusterName, {module, Mod, #{}}, Cluster), + [Follower1, Follower2] = lists:delete(Leader, Cluster), + timer:sleep(100), + %% leader and follower 1 are v2s + ra:stop_server(?SYS, Leader), + ra:stop_server(?SYS, Follower1), + ra:stop_server(?SYS, Follower2), + meck:expect(Mod, version, fun () -> + New = [whereis(element(1, Leader)), + whereis(element(1, Follower1))], + case lists:member(self(), New) of + true -> 2; + _ -> 1 + end + end), + ra:restart_server(?SYS, Leader), + ra:restart_server(?SYS, Follower1), + timer:sleep(100), + {ok, _, _Leader1} = ra:members(Leader, 2000), + ra:restart_server(?SYS, Follower2), + %% The cluster is still using v1 even though Leader and Follower2 knows + %% about v2. + lists:foreach( + fun(Member) -> + await(fun () -> + case ra:member_overview(Member) of + {ok, #{effective_machine_version := 1, + machine_version := 1}, _} + when Member == Follower2 -> + true; + {ok, #{effective_machine_version := 1, + machine_version := 2}, _} -> + true; + _ -> + false + end + end, 100) + end, Cluster), + %% Restart Follower2 with v2. The cluster should now upgrade to v2. + ra:stop_server(?SYS, Follower2), + meck:expect(Mod, version, fun () -> 2 end), + ra:restart_server(?SYS, Follower2), + lists:foreach( + fun(Member) -> + await(fun () -> + case ra:member_overview(Member) of + {ok, #{effective_machine_version := 2, + machine_version := 2}, _} -> + true; + _ -> + false + end + end, 100) + end, Cluster), + + ok. + server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher(Config) -> ok = ra_env:configure_logger(logger), LogFile = filename:join([?config(priv_dir, Config), "ra.log"]), @@ -336,89 +424,6 @@ server_applies_with_new_module(Config) -> {ok, state_v1, _} = ra:consistent_query(ServerId, fun ra_lib:id/1), ok. -lower_version_does_not_apply_until_upgraded(Config) -> - ok = logger:set_primary_config(level, all), - Mod = ?config(modname, Config), - meck:new(Mod, [non_strict]), - meck:expect(Mod, init, fun (_) -> init_state end), - meck:expect(Mod, version, fun () -> 1 end), - meck:expect(Mod, which_module, fun (_) -> Mod end), - meck:expect(Mod, apply, fun - (_, {machine_version, _, _}, S) -> - %% retain state for machine versions - {S, ok}; - (_, C, _) -> - %% any other command replaces the state - {C, ok} - end), - Cluster = ?config(cluster, Config), - ClusterName = ?config(cluster_name, Config), - %% 3 node cluster, upgrade the first two to the later version - %% leaving the follower on a lower version - Leader = start_cluster(ClusterName, {module, Mod, #{}}, Cluster), - Followers = lists:delete(Leader, Cluster), - ct:pal("Leader1 ~w Followers ~w", [Leader, Followers]), - meck:expect(Mod, version, fun () -> - Self = self(), - case whereis(element(1, Leader)) of - Self -> 2; - _ -> 1 - end - end), - timer:sleep(200), - ra:stop_server(?SYS, Leader), - {ok, _, Leader2} = ra:members(Followers), - [LastFollower] = lists:delete(Leader2, Followers), - ct:pal("Leader2 ~w LastFollower ~w", [Leader2, LastFollower]), - ra:restart_server(?SYS, Leader), - meck:expect(Mod, version, fun () -> - New = [whereis(element(1, Leader)), - whereis(element(1, Leader2))], - case lists:member(self(), New) of - true -> 2; - _ -> 1 - end - end), - ra:stop_server(?SYS, Leader2), - timer:sleep(500), - {ok, _, Leader3} = ra:members(LastFollower), - ct:pal("Leader3 ~w LastFollower ~w", [Leader3, LastFollower]), - ra:restart_server(?SYS, Leader2), - - case Leader3 of - LastFollower -> - %% if last follower happened to be elected - ct:pal("Leader3 is LastFollower", []), - ra:stop_server(?SYS, Leader3), - %% allow time for a different member to be elected - timer:sleep(1000), - ra:restart_server(?SYS, Leader3); - _ -> ok - end, - - - %% process a command that should be replicated to all servers but only - %% applied to new machine version servers - {ok, ok, _} = ra:process_command(Leader, dummy), - %% a little sleep to make it more likely that replication is complete to - %% all servers and not just a quorum - timer:sleep(100), - - %% the updated servers should have the same state - {ok, {{Idx, _}, dummy}, _} = ra:local_query(Leader, fun ra_lib:id/1), - {ok, {{Idx, _}, dummy}, _} = ra:local_query(Leader2, fun ra_lib:id/1), - %% the last follower with the lower machine version should not have - %% applied the last command - {ok, {{LFIdx, _}, init_state}, _} = ra:local_query(LastFollower, fun ra_lib:id/1), - - ra:stop_server(?SYS, LastFollower), - ra:restart_server(?SYS, LastFollower), - - {ok, {{LFIdx, _}, init_state}, _} = ra:local_query(LastFollower, fun ra_lib:id/1), - - ?assert(Idx > LFIdx), - ok. - snapshot_persists_machine_version(_Config) -> error({todo, ?FUNCTION_NAME}). diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index a17ba982..a4663e31 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -1861,7 +1861,8 @@ candidate_election(_Config) -> {follower, #{current_term := 7}, []} = ra_server:handle_candidate(HighTermResult, State1), - MacVer = 1, + EffectiveMacVer = 0, + MacVer = EffectiveMacVer + 1, meck:expect(ra_machine, version, fun (_) -> MacVer end), % quorum has been achieved - candidate becomes leader @@ -1875,7 +1876,12 @@ candidate_election(_Config) -> N4 := PeerState, N5 := PeerState}}, [ - {next_event, cast, {command, {noop, _, MacVer}}}, + {next_event, cast, {command, {noop, _, EffectiveMacVer}}}, + {send_rpc, N2, {info_rpc, _, _, _}}, + {send_rpc, N3, {info_rpc, _, _, _}}, + {send_rpc, N4, {info_rpc, _, _, _}}, + {send_rpc, N5, {info_rpc, _, _, _}}, + {send_rpc, _, _}, {send_rpc, _, _}, {send_rpc, _, _},